Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reader go routine hangs and leaks when Connection.Close() is called multiple times #69

Closed
fho opened this issue Apr 13, 2022 · 0 comments · Fixed by #70
Closed

reader go routine hangs and leaks when Connection.Close() is called multiple times #69

fho opened this issue Apr 13, 2022 · 0 comments · Fixed by #70
Milestone

Comments

@fho
Copy link
Contributor

fho commented Apr 13, 2022

Hello,

we are using go.uber.org/goleak for our internal amqp wrapper package and stumbled over a go-routine leak.
This happens in an internal testcase where we send 2 messages to a non-existing exchange. This causes an error that is received by our reconnect goroutine on the NotifyClose channel which to close the connection and then the connection is also closed in our client (after the reconnect go-routine terminated).
In our testcase we only call 1x close on the channel (unnecessary) and 1x Close on the connection.

I managed to reproduce the same hang and go-routine leak when calling Close in parallel on the same connection. This is not the same scenario that happens in our internal testcase but it reproduces it. :-)

I'm using amqp091-go commit 6cac2fa.

This issue can be reproduced with the following testcase:

//go:build integration
// +build integration

package amqp091

import (
	"sync"
	"testing"

	"go.uber.org/goleak"
)

func TestGoRoutineLeakOnParallelConClose(t *testing.T) {
	const routines = 2
	defer goleak.VerifyNone(t)
	c := integrationConnection(t, t.Name())

	var wg sync.WaitGroup
	startSigCh := make(chan interface{})

	for i := 0; i < routines; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()

			<-startSigCh

			err := c.Close()
			if err != nil {
				t.Logf("close failed in routine %d: %s", id, err.Error())
			}
		}(i)
	}
	close(startSigCh)

	t.Log("waiting for go-routines to terminate")
	wg.Wait()
}

Test output:

=== RUN   TestGoRoutineLeakOnParallelConClose
    bug_test.go:36: waiting for go-routines to terminate
    bug_test.go:30: close failed in routine 0: Exception (504) Reason: "channel/connection is not open"
    leaks.go:78: found unexpected goroutines:
        [Goroutine 11 in state chan send, with github.com/rabbitmq/amqp091-go.(*Connection).dispatch0 on top of the stack:
        goroutine 11 [chan send]:
        github.com/rabbitmq/amqp091-go.(*Connection).dispatch0(0xc0001263c0, {0x8c5828?, 0xc00000e240})
        	/home/fho/git/amqp091-go/connection.go:483 +0x40f
        github.com/rabbitmq/amqp091-go.(*Connection).demux(0xc00005af18?, {0x8c5828, 0xc00000e240})
        	/home/fho/git/amqp091-go/connection.go:456 +0x59
        github.com/rabbitmq/amqp091-go.(*Connection).reader(0xc0001263c0, {0x8c4980?, 0xc000010068})
        	/home/fho/git/amqp091-go/connection.go:550 +0x225
        created by github.com/rabbitmq/amqp091-go.Open
        	/home/fho/git/amqp091-go/connection.go:251 +0x5eb
        ]
--- FAIL: TestGoRoutineLeakOnParallelConClose (0.46s)

On my machine it happens on almost every execution, some succeed without the leak though.
It might be necessary to run the testcase multiple times to run into the issue:

while go test -race -count=1 -run=TestGoRoutineLeakOnParallelConClose -v -tags integration; do : ; done

Update:
I think I now understand how it happens:

  • Close() is called 2x, both calls send out a close message1
  • For both messages the reader reads the response frame, calls c.demux() and passed the msg to the rpc chan2
  • The shutdown closes the errors channel3
  • for one of the received responses the call() method did not read the msg from the rpc channel yet, it is waiting in the select loop4, because the errors channel is closed the call() returns without reading the msg from the rpc channel, the reader go-routine hangs forever in dispatch0() trying to send the msg to the rpc channel 2 because it is unbuffered

I guess this scenario, that call() returns before reading a msg from the rpc chan could also get triggered when Close() is called only 1x but an error happened, shortly after a message response was received.
call() could return because an error is read from c.errors while dispatch0 is sending a message to the rpc chan.

Footnotes

  1. https://github.com/rabbitmq/amqp091-go/blob/6cac2faf74b0e761395b4da4ebfa3fe4a8eb8b59/connection.go#L350

  2. https://github.com/rabbitmq/amqp091-go/blob/6cac2faf74b0e761395b4da4ebfa3fe4a8eb8b59/connection.go#L483 2

  3. https://github.com/rabbitmq/amqp091-go/blob/6cac2faf74b0e761395b4da4ebfa3fe4a8eb8b59/connection.go#L425

  4. https://github.com/rabbitmq/amqp091-go/blob/6cac2faf74b0e761395b4da4ebfa3fe4a8eb8b59/connection.go#L692

fho added a commit to fho/amqp091-go that referenced this issue Apr 13, 2022
Add a testcase for the bug that the reader go-routine tries to send a
message to the buffered rpc channel but call() terminated because it
read an error from the errors chan or the errors chan was closed.
It cause that reader routine gets stuck forever and does not terminate
when the connection is closed.
More information: rabbitmq#69.

This testcase does not reproduce the issue reliably, but it is triggered
in ~80% of executions.
fho added a commit to fho/amqp091-go that referenced this issue Apr 13, 2022
Add a testcase for the bug that the reader go-routine tries to send a
message to the buffered rpc channel but call() terminated because it
read an error from the errors chan or the errors chan was closed.
It cause that reader routine gets stuck forever and does not terminate
when the connection is closed.
More information: rabbitmq#69.

This testcase does not reproduce the issue reliably, but it is triggered
in ~80% of executions.
@lukebakken lukebakken added this to the 1.3.5 milestone Apr 14, 2022
lukebakken added a commit to fho/amqp091-go that referenced this issue Apr 14, 2022
lukebakken pushed a commit to fho/amqp091-go that referenced this issue Apr 14, 2022
When a message was sent and it's response was received while the
connection was closed or an error happened, the reader go-routine could
get stuck and be leaked.

The reader go routine tries to send a received message to the unbuffered
c.rpc channel via the dispatch0() and dispatchN() methods.
The call() method reads from the rpc channel.
If an error happened while the dispatch method sends a message to the
rpc channel, the call() method could terminate because it read an error
from c.errors or because c.errors was closed.

To prevent the scenario:
- the reader go-routine now closes c.rpc when it terminates,
- The call() method, reads from c.rpc until a message was received or it
  is closed. When c.rpc is closed, it reads an error from c.errors or
  wait until c.errors is closed.
  When it reads an error, it returns it. If it is closed it returns
  ErrClosed.

This ensures that the messages is read from c.rpc before call() returns.
It also ensures that when a message was received that it is processed.
Previously it could happen that the message was silently ignored because
c.errors returned an error or was closed.

tests: add testcase to ensure reader routine terminates

Add a testcase for the bug that the reader go-routine tries to send a
message to the buffered rpc channel but call() terminated because it
read an error from the errors chan or the errors chan was closed.
It cause that reader routine gets stuck forever and does not terminate
when the connection is closed.
More information: rabbitmq#69.

This testcase does not reproduce the issue reliably, but it is triggered
in ~80% of executions.

Bump GH actions versions

Add step in GH actions to download goleak dependency
lukebakken added a commit to fho/amqp091-go that referenced this issue Apr 19, 2022
lukebakken pushed a commit to fho/amqp091-go that referenced this issue Apr 19, 2022
When a message was sent and it's response was received while the
connection was closed or an error happened, the reader go-routine could
get stuck and be leaked.

The reader go routine tries to send a received message to the unbuffered
c.rpc channel via the dispatch0() and dispatchN() methods.
The call() method reads from the rpc channel.
If an error happened while the dispatch method sends a message to the
rpc channel, the call() method could terminate because it read an error
from c.errors or because c.errors was closed.

To prevent the scenario:
- the reader go-routine now closes c.rpc when it terminates,
- The call() method, reads from c.rpc until a message was received or it
  is closed. When c.rpc is closed, it reads an error from c.errors or
  wait until c.errors is closed.
  When it reads an error, it returns it. If it is closed it returns
  ErrClosed.

This ensures that the messages is read from c.rpc before call() returns.
It also ensures that when a message was received that it is processed.
Previously it could happen that the message was silently ignored because
c.errors returned an error or was closed.

tests: add testcase to ensure reader routine terminates

Add a testcase for the bug that the reader go-routine tries to send a
message to the buffered rpc channel but call() terminated because it
read an error from the errors chan or the errors chan was closed.
It cause that reader routine gets stuck forever and does not terminate
when the connection is closed.
More information: rabbitmq#69.

This testcase does not reproduce the issue reliably, but it is triggered
in ~80% of executions.

Bump GH actions versions

add missing go.sum file

tests/TestRequiredServerLocale: close connection on testcase termination

The TestRequiredServerLocale testcase was not closing the connection
that it opened.
This caused the goleak detector in the
TestReaderGoRoutineTerminatesWhenMsgIsProcessedDuringClose testcase to
complain about a leaked heartbeat go-routine.

tests: remove duplicate goleak invocation

goleak is now called in TestMain().
The invocation in the
TestReaderGoRoutineTerminatesWhenMsgIsProcessedDuringClose testcase can
be removed.

tests/ExampleConnection_reconnect: close connection on termination

ExampleConnection_reconnect was not closing the opened connection on
termination. This caused the goleak checker to complain about a leaked
heartbeat go routine.

Close the connection.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants