Skip to content

Commit

Permalink
Merge pull request #70 from fho/reader_routine_leak
Browse files Browse the repository at this point in the history
connection: fix: reader go-routine is leaked on connection close
  • Loading branch information
lukebakken committed Apr 19, 2022
2 parents 02f3715 + 2dfde48 commit 5c7877f
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 20 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ jobs:
- 5672:5672
steps:
- name: Check out code into the Go module directory
uses: actions/checkout@v2
uses: actions/checkout@v3
- name: Set up Go environment
uses: actions/setup-go@v2
uses: actions/setup-go@v3
with:
go-version: ${{ matrix.go-version }}
- name: Tests
Expand Down
2 changes: 0 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ list: ## list Makefile targets
@echo "The most used targets: \n"
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}'


fmt: ## Run go fmt against code
go fmt ./...


vet: ## Run go vet against code
go vet ./...

Expand Down
31 changes: 16 additions & 15 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,8 @@ func (c *Connection) reader(r io.Reader) {
frames := &reader{buf}
conn, haveDeadliner := r.(readDeadliner)

defer close(c.rpc)

for {
frame, err := frames.ReadFrame()

Expand Down Expand Up @@ -689,27 +691,26 @@ func (c *Connection) call(req message, res ...message) error {
}
}

select {
case err, ok := <-c.errors:
if !ok {
msg, ok := <-c.rpc
if !ok {
err, errorsChanIsOpen := <-c.errors
if !errorsChanIsOpen {
return ErrClosed
}
return err
}

case msg := <-c.rpc:
// Try to match one of the result types
for _, try := range res {
if reflect.TypeOf(msg) == reflect.TypeOf(try) {
// *res = *msg
vres := reflect.ValueOf(try).Elem()
vmsg := reflect.ValueOf(msg).Elem()
vres.Set(vmsg)
return nil
}
// Try to match one of the result types
for _, try := range res {
if reflect.TypeOf(msg) == reflect.TypeOf(try) {
// *res = *msg
vres := reflect.ValueOf(try).Elem()
vmsg := reflect.ValueOf(msg).Elem()
vres.Set(vmsg)
return nil
}
return ErrCommandInvalid
}
// unreachable
return ErrCommandInvalid
}

// Connection = open-Connection *use-Connection close-Connection
Expand Down
28 changes: 28 additions & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,31 @@ func TestChannelIsClosed(t *testing.T) {
t.Fatal("channel expected to be marked as closed")
}
}

// TestReaderGoRoutineTerminatesWhenMsgIsProcessedDuringClose tests the issue
// described in https://github.com/rabbitmq/amqp091-go/issues/69.
func TestReaderGoRoutineTerminatesWhenMsgIsProcessedDuringClose(t *testing.T) {
const routines = 10
c := integrationConnection(t, t.Name())

var wg sync.WaitGroup
startSigCh := make(chan interface{})

for i := 0; i < routines; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()

<-startSigCh

err := c.Close()
if err != nil {
t.Logf("close failed in routine %d: %s", id, err.Error())
}
}(i)
}
close(startSigCh)

t.Log("waiting for go-routines to terminate")
wg.Wait()
}
3 changes: 2 additions & 1 deletion reconnect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,12 @@ func ExampleConnection_reconnect() {
// between reconnects, shares the same topology as the consumer. If we rather
// sent all messages up front, the first consumer would receive every message.
// We would rather show how the messages are not lost between reconnects.
_, pub, err := setup(url, queue)
con, pub, err := setup(url, queue)
if err != nil {
fmt.Println("err publisher setup:", err)
return
}
defer con.Close()

// Purge the queue from the publisher side to establish initial state
if _, err := pub.QueuePurge(queue, false); err != nil {
Expand Down

0 comments on commit 5c7877f

Please sign in to comment.