Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve rare deadlock caused by loss of connection (in particular circumstances) #512

Merged
merged 1 commit into from Jun 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 16 additions & 1 deletion client.go
Expand Up @@ -607,7 +607,22 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet
commsIncomingPub = nil
continue
}
incomingPubChan <- pub
// Care is needed here because an error elsewhere could trigger a deadlock
sendPubLoop:
for {
select {
case incomingPubChan <- pub:
break sendPubLoop
case err, ok := <-commsErrors:
if !ok { // commsErrors has been closed so we can ignore it
commsErrors = nil
continue
}
ERROR.Println(CLI, "Connect comms goroutine - error triggered during send Pub", err)
c.internalConnLost(err) // no harm in calling this if the connection is already down (or shutdown is in progress)
continue
}
}
case err, ok := <-commsErrors:
if !ok {
commsErrors = nil
Expand Down
101 changes: 101 additions & 0 deletions fvt_client_test.go
Expand Up @@ -16,7 +16,10 @@ package mqtt

import (
"bytes"
"context"
"fmt"
"runtime"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -1444,3 +1447,101 @@ func Test_ResumeSubsWithReconnect(t *testing.T) {

c.Disconnect(250)
}

// Issue 209 - occasional deadlock when connections are lost unexpectedly
// This was quite a nasty deadlock which occurred in very rare circumstances; I could not come up with a reliable way of
// replicating this but the below would cause it to happen fairly consistently (when the test was run a decent number
// of times). Following the fix it ran 10,000 times without issue.
// go test -count 10000 -run DisconnectWhileProcessingIncomingPublish
func Test_DisconnectWhileProcessingIncomingPublish(t *testing.T) {
topic := "/test/DisconnectWhileProcessingIncomingPublish"

pops := NewClientOptions()
pops.AddBroker(FVTTCP)
// pops.SetOrderMatters(false) // Not really needed but consistent...
pops.SetClientID("dwpip-pub")
p := NewClient(pops)

sops := NewClientOptions()
sops.AddBroker(FVTTCP)
sops.SetAutoReconnect(false) // We dont want the connection to be re-established
sops.SetWriteTimeout(500 * time.Millisecond) // We will be sending a lot of publish messages and want go routines to clear...
// sops.SetOrderMatters(false)
sops.SetClientID("dwpip-sub")
// We need to know when the subscriber has lost its connection (this indicates that the deadlock has not occured)
sDisconnected := make(chan struct{})
sops.SetConnectionLostHandler(func(Client, error) { close(sDisconnected) })

msgReceived := make(chan struct{})
var oneMsgReceived sync.Once
var f MessageHandler = func(client Client, msg Message) {
// No need to do anything when message received (just want ACK sent ASAP)
oneMsgReceived.Do(func() { close(msgReceived) })
}

s := NewClient(sops).(*client) // s = subscriber
if sToken := s.Connect(); sToken.Wait() && sToken.Error() != nil {
t.Fatalf("Error on subscriber Client.Connect(): %v", sToken.Error())
}

if sToken := s.Subscribe(topic, 1, f); sToken.Wait() && sToken.Error() != nil {
t.Fatalf("Error on subscriber Client.Subscribe(): %v", sToken.Error())
}

// Use a go routine to swamp the broker with messages
if pToken := p.Connect(); pToken.Wait() && pToken.Error() != nil { // p = publisher
t.Fatalf("Error on publisher Client.Connect(): %v", pToken.Error())
}
// We will hammer both the publisher and subscriber with messages
ctx, cancel := context.WithCancel(context.Background())
pubDone := make(chan struct{})
go func() {
defer close(pubDone)
i := 0
for {
p.Publish(topic, 1, false, fmt.Sprintf("test message: %d", i))
// After the connection goes down s.Publish will start blocking (this is not ideal but fixing its a problem for another time)
go func() { s.Publish(topic+"IGNORE", 1, false, fmt.Sprintf("test message: %d", i)) }()
i++

if ctx.Err() != nil {
return
}
}
}()

// Wait until we have received a message (ensuring that the stream of messages has started)
select {
case <-msgReceived: // All good
case <-time.After(time.Second):
t.Errorf("no messages received")
}

// We need the connection to drop; unfortunately using any internal method (`s.conn.Close()` etc) will hide the
// behaviour because any calls to Read/Write will return immediately. So we just ask the broker to disconnect..
dm := packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket)
err := dm.Write(s.conn)
if err != nil {
t.Fatalf("error dending disconnect packet: %s", err)
}

// Lets give the library up to a second to shutdown (indicated by the status changing)
select {
case <-sDisconnected: // All good
case <-time.After(time.Second):
cancel() // no point leaving publisher running
time.Sleep(time.Second) // Allow publish calls to timeout (otherwise there will be tons of go routines running!)
buf := make([]byte, 1<<20)
stacklen := runtime.Stack(buf, true)
t.Fatalf("connection was not lost as expected - probable deadlock. Stacktrace follows: %s", buf[:stacklen])
}

cancel() // no point leaving publisher running

select {
case <-pubDone:
case <-time.After(time.Second):
t.Errorf("pubdone not closed within a second")
}
p.Disconnect(250) // Close publisher
}