Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connection: fix: reader go-routine is leaked on connection close #70

Merged
merged 1 commit into from
Apr 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ jobs:
- 5672:5672
steps:
- name: Check out code into the Go module directory
uses: actions/checkout@v2
uses: actions/checkout@v3
- name: Set up Go environment
uses: actions/setup-go@v2
uses: actions/setup-go@v3
with:
go-version: ${{ matrix.go-version }}
- name: Tests
Expand Down
2 changes: 0 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ list: ## list Makefile targets
@echo "The most used targets: \n"
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}'


fmt: ## Run go fmt against code
go fmt ./...


vet: ## Run go vet against code
go vet ./...

Expand Down
31 changes: 16 additions & 15 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,8 @@ func (c *Connection) reader(r io.Reader) {
frames := &reader{buf}
conn, haveDeadliner := r.(readDeadliner)

defer close(c.rpc)

for {
frame, err := frames.ReadFrame()

Expand Down Expand Up @@ -689,27 +691,26 @@ func (c *Connection) call(req message, res ...message) error {
}
}

select {
case err, ok := <-c.errors:
if !ok {
msg, ok := <-c.rpc
if !ok {
err, errorsChanIsOpen := <-c.errors
if !errorsChanIsOpen {
return ErrClosed
}
return err
}

case msg := <-c.rpc:
// Try to match one of the result types
for _, try := range res {
if reflect.TypeOf(msg) == reflect.TypeOf(try) {
// *res = *msg
vres := reflect.ValueOf(try).Elem()
vmsg := reflect.ValueOf(msg).Elem()
vres.Set(vmsg)
return nil
}
// Try to match one of the result types
for _, try := range res {
if reflect.TypeOf(msg) == reflect.TypeOf(try) {
// *res = *msg
vres := reflect.ValueOf(try).Elem()
vmsg := reflect.ValueOf(msg).Elem()
vres.Set(vmsg)
return nil
}
return ErrCommandInvalid
}
// unreachable
return ErrCommandInvalid
}

// Connection = open-Connection *use-Connection close-Connection
Expand Down
28 changes: 28 additions & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,31 @@ func TestChannelIsClosed(t *testing.T) {
t.Fatal("channel expected to be marked as closed")
}
}

// TestReaderGoRoutineTerminatesWhenMsgIsProcessedDuringClose tests the issue
// described in https://github.com/rabbitmq/amqp091-go/issues/69.
func TestReaderGoRoutineTerminatesWhenMsgIsProcessedDuringClose(t *testing.T) {
const routines = 10
c := integrationConnection(t, t.Name())

var wg sync.WaitGroup
startSigCh := make(chan interface{})

for i := 0; i < routines; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()

<-startSigCh

err := c.Close()
if err != nil {
t.Logf("close failed in routine %d: %s", id, err.Error())
}
}(i)
}
close(startSigCh)

t.Log("waiting for go-routines to terminate")
wg.Wait()
}
3 changes: 2 additions & 1 deletion reconnect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,12 @@ func ExampleConnection_reconnect() {
// between reconnects, shares the same topology as the consumer. If we rather
// sent all messages up front, the first consumer would receive every message.
// We would rather show how the messages are not lost between reconnects.
_, pub, err := setup(url, queue)
con, pub, err := setup(url, queue)
if err != nil {
fmt.Println("err publisher setup:", err)
return
}
defer con.Close()

// Purge the queue from the publisher side to establish initial state
if _, err := pub.QueuePurge(queue, false); err != nil {
Expand Down