Skip to content

Commit

Permalink
connection: fix: reader go-routine is leaked on connection close
Browse files Browse the repository at this point in the history
When a message was sent and it's response was received while the
connection was closed or an error happened, the reader go-routine could
get stuck and be leaked.

The reader go routine tries to send a received message to the unbuffered
c.rpc channel via the dispatch0() and dispatchN() methods.
The call() method reads from the rpc channel.
If an error happened while the dispatch method sends a message to the
rpc channel, the call() method could terminate because it read an error
from c.errors or because c.errors was closed.

To prevent the scenario:
- the reader go-routine now closes c.rpc when it terminates,
- The call() method, reads from c.rpc until a message was received or it
  is closed. When c.rpc is closed, it reads an error from c.errors or
  wait until c.errors is closed.
  When it reads an error, it returns it. If it is closed it returns
  ErrClosed.

This ensures that the messages is read from c.rpc before call() returns.
It also ensures that when a message was received that it is processed.
Previously it could happen that the message was silently ignored because
c.errors returned an error or was closed.

tests: add testcase to ensure reader routine terminates

Add a testcase for the bug that the reader go-routine tries to send a
message to the buffered rpc channel but call() terminated because it
read an error from the errors chan or the errors chan was closed.
It cause that reader routine gets stuck forever and does not terminate
when the connection is closed.
More information: #69.

This testcase does not reproduce the issue reliably, but it is triggered
in ~80% of executions.

Bump GH actions versions

add missing go.sum file

tests/TestRequiredServerLocale: close connection on testcase termination

The TestRequiredServerLocale testcase was not closing the connection
that it opened.
This caused the goleak detector in the
TestReaderGoRoutineTerminatesWhenMsgIsProcessedDuringClose testcase to
complain about a leaked heartbeat go-routine.

tests: remove duplicate goleak invocation

goleak is now called in TestMain().
The invocation in the
TestReaderGoRoutineTerminatesWhenMsgIsProcessedDuringClose testcase can
be removed.

tests/ExampleConnection_reconnect: close connection on termination

ExampleConnection_reconnect was not closing the opened connection on
termination. This caused the goleak checker to complain about a leaked
heartbeat go routine.

Close the connection.
  • Loading branch information
fho authored and lukebakken committed Apr 19, 2022
1 parent 02f3715 commit 2dfde48
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 20 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ jobs:
- 5672:5672
steps:
- name: Check out code into the Go module directory
uses: actions/checkout@v2
uses: actions/checkout@v3
- name: Set up Go environment
uses: actions/setup-go@v2
uses: actions/setup-go@v3
with:
go-version: ${{ matrix.go-version }}
- name: Tests
Expand Down
2 changes: 0 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ list: ## list Makefile targets
@echo "The most used targets: \n"
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}'


fmt: ## Run go fmt against code
go fmt ./...


vet: ## Run go vet against code
go vet ./...

Expand Down
31 changes: 16 additions & 15 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,8 @@ func (c *Connection) reader(r io.Reader) {
frames := &reader{buf}
conn, haveDeadliner := r.(readDeadliner)

defer close(c.rpc)

for {
frame, err := frames.ReadFrame()

Expand Down Expand Up @@ -689,27 +691,26 @@ func (c *Connection) call(req message, res ...message) error {
}
}

select {
case err, ok := <-c.errors:
if !ok {
msg, ok := <-c.rpc
if !ok {
err, errorsChanIsOpen := <-c.errors
if !errorsChanIsOpen {
return ErrClosed
}
return err
}

case msg := <-c.rpc:
// Try to match one of the result types
for _, try := range res {
if reflect.TypeOf(msg) == reflect.TypeOf(try) {
// *res = *msg
vres := reflect.ValueOf(try).Elem()
vmsg := reflect.ValueOf(msg).Elem()
vres.Set(vmsg)
return nil
}
// Try to match one of the result types
for _, try := range res {
if reflect.TypeOf(msg) == reflect.TypeOf(try) {
// *res = *msg
vres := reflect.ValueOf(try).Elem()
vmsg := reflect.ValueOf(msg).Elem()
vres.Set(vmsg)
return nil
}
return ErrCommandInvalid
}
// unreachable
return ErrCommandInvalid
}

// Connection = open-Connection *use-Connection close-Connection
Expand Down
28 changes: 28 additions & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,31 @@ func TestChannelIsClosed(t *testing.T) {
t.Fatal("channel expected to be marked as closed")
}
}

// TestReaderGoRoutineTerminatesWhenMsgIsProcessedDuringClose tests the issue
// described in https://github.com/rabbitmq/amqp091-go/issues/69.
func TestReaderGoRoutineTerminatesWhenMsgIsProcessedDuringClose(t *testing.T) {
const routines = 10
c := integrationConnection(t, t.Name())

var wg sync.WaitGroup
startSigCh := make(chan interface{})

for i := 0; i < routines; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()

<-startSigCh

err := c.Close()
if err != nil {
t.Logf("close failed in routine %d: %s", id, err.Error())
}
}(i)
}
close(startSigCh)

t.Log("waiting for go-routines to terminate")
wg.Wait()
}
3 changes: 2 additions & 1 deletion reconnect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,12 @@ func ExampleConnection_reconnect() {
// between reconnects, shares the same topology as the consumer. If we rather
// sent all messages up front, the first consumer would receive every message.
// We would rather show how the messages are not lost between reconnects.
_, pub, err := setup(url, queue)
con, pub, err := setup(url, queue)
if err != nil {
fmt.Println("err publisher setup:", err)
return
}
defer con.Close()

// Purge the queue from the publisher side to establish initial state
if _, err := pub.QueuePurge(queue, false); err != nil {
Expand Down

0 comments on commit 2dfde48

Please sign in to comment.