Skip to content

Commit

Permalink
connection: fix: reader go-routine is leaked on connection close
Browse files Browse the repository at this point in the history
When a message was sent and it's response was received while the
connection was closed or an error happened, the reader go-routine could
get stuck and be leaked.

The reader go routine tries to send a received message to the unbuffered
c.rpc channel via the dispatch0() and dispatchN() methods.
The call() method reads from the rpc channel.
If an error happened while the dispatch method sends a message to the
rpc channel, the call() method could terminate because it read an error
from c.errors or because c.errors was closed.

To prevent the scenario:
- the reader go-routine now closes c.rpc when it terminates,
- The call() method, reads from c.rpc until a message was received or it
  is closed. When c.rpc is closed, it reads an error from c.errors or
  wait until c.errors is closed.
  When it reads an error, it returns it. If it is closed it returns
  ErrClosed.

This ensures that the messages is read from c.rpc before call() returns.
It also ensures that when a message was received that it is processed.
Previously it could happen that the message was silently ignored because
c.errors returned an error or was closed.

tests: add testcase to ensure reader routine terminates

Add a testcase for the bug that the reader go-routine tries to send a
message to the buffered rpc channel but call() terminated because it
read an error from the errors chan or the errors chan was closed.
It cause that reader routine gets stuck forever and does not terminate
when the connection is closed.
More information: rabbitmq#69.

This testcase does not reproduce the issue reliably, but it is triggered
in ~80% of executions.

Bump GH actions versions

Add step in GH actions to download goleak dependency
  • Loading branch information
fho authored and lukebakken committed Apr 14, 2022
1 parent 6cac2fa commit 3e2ea21
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 17 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ jobs:
- 5672:5672
steps:
- name: Check out code into the Go module directory
uses: actions/checkout@v2
uses: actions/checkout@v3
- name: Set up Go environment
uses: actions/setup-go@v2
uses: actions/setup-go@v3
with:
go-version: ${{ matrix.go-version }}
- name: Dependencies
run: go mod download go.uber.org/goleak
- name: Tests
run: AMQP_URL=amqp://guest:guest@localhost:5672/ go test -cpu=1,2 -v -tags integration
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

1. Go: [https://golang.org/dl/](https://golang.org/dl/)
1. Golint `go get -u -v github.com/golang/lint/golint`
1. Goleak `go mod download go.uber.org/goleak`

## Contributing

Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ list: ## list Makefile targets
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}'


deps:
go mod download go.uber.org/goleak

fmt: ## Run go fmt against code
go fmt ./...

Expand Down
31 changes: 16 additions & 15 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,8 @@ func (c *Connection) reader(r io.Reader) {
frames := &reader{buf}
conn, haveDeadliner := r.(readDeadliner)

defer close(c.rpc)

for {
frame, err := frames.ReadFrame()

Expand Down Expand Up @@ -689,27 +691,26 @@ func (c *Connection) call(req message, res ...message) error {
}
}

select {
case err, ok := <-c.errors:
if !ok {
msg, ok := <-c.rpc
if !ok {
err, errorsChanIsOpen := <-c.errors
if !errorsChanIsOpen {
return ErrClosed
}
return err
}

case msg := <-c.rpc:
// Try to match one of the result types
for _, try := range res {
if reflect.TypeOf(msg) == reflect.TypeOf(try) {
// *res = *msg
vres := reflect.ValueOf(try).Elem()
vmsg := reflect.ValueOf(msg).Elem()
vres.Set(vmsg)
return nil
}
// Try to match one of the result types
for _, try := range res {
if reflect.TypeOf(msg) == reflect.TypeOf(try) {
// *res = *msg
vres := reflect.ValueOf(try).Elem()
vmsg := reflect.ValueOf(msg).Elem()
vres.Set(vmsg)
return nil
}
return ErrCommandInvalid
}
// unreachable
return ErrCommandInvalid
}

// Connection = open-Connection *use-Connection close-Connection
Expand Down
31 changes: 31 additions & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"sync"
"testing"
"time"

"go.uber.org/goleak"
)

func TestRequiredServerLocale(t *testing.T) {
Expand Down Expand Up @@ -210,3 +212,32 @@ func TestChannelIsClosed(t *testing.T) {
t.Fatal("channel expected to be marked as closed")
}
}

// TestReaderGoRoutineTerminatesWhenMsgIsProcessedDuringClose tests the issue
// described in https://github.com/rabbitmq/amqp091-go/issues/69.
func TestReaderGoRoutineTerminatesWhenMsgIsProcessedDuringClose(t *testing.T) {
const routines = 10
defer goleak.VerifyNone(t)
c := integrationConnection(t, t.Name())

var wg sync.WaitGroup
startSigCh := make(chan interface{})

for i := 0; i < routines; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()

<-startSigCh

err := c.Close()
if err != nil {
t.Logf("close failed in routine %d: %s", id, err.Error())
}
}(i)
}
close(startSigCh)

t.Log("waiting for go-routines to terminate")
wg.Wait()
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
module github.com/rabbitmq/amqp091-go

go 1.16

require go.uber.org/goleak v1.1.12

0 comments on commit 3e2ea21

Please sign in to comment.