Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve potential deadlocks on Resume/Error #434

Merged
merged 2 commits into from Sep 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
157 changes: 102 additions & 55 deletions client.go
Expand Up @@ -115,11 +115,9 @@ type client struct {
conn net.Conn // the network connection, must only be set with connMu locked (only used when starting/stopping workers)
connMu sync.Mutex // mutex for the connection (again only used in two functions)

stop chan struct{} // Closed to request that workers stop
workers sync.WaitGroup // used to wait for workers to complete (ping, keepalive, errwatch, resume)
commsStopped chan struct{} // closed when the comms routines have stopped (kept running until after workers have closed to avoid deadlocks)
commsobound chan *PacketAndToken // outgoing publish packets serviced by active comms go routines (maintains compatibility)
commsoboundP chan *PacketAndToken // outgoing 'priotity' packet serviced by active comms go routines (maintains compatibility)
stop chan struct{} // Closed to request that workers stop
workers sync.WaitGroup // used to wait for workers to complete (ping, keepalive, errwatch, resume)
commsStopped chan struct{} // closed when the comms routines have stopped (kept running until after workers have closed to avoid deadlocks)
}

// NewClient will create an MQTT v3.1.1 client with all of the options specified
Expand Down Expand Up @@ -436,39 +434,47 @@ func (c *client) forceDisconnect() {

// disconnect cleans up after a final disconnection (user requested so no auto reconnection)
func (c *client) disconnect() {
c.stopCommsWorkers()
c.messageIds.cleanUp()
DEBUG.Println(CLI, "disconnected")
c.persist.Close()
done := c.stopCommsWorkers()
if done != nil {
<-done // Wait until the disconect is complete (to limit chance that another connection will be started)
c.messageIds.cleanUp()
DEBUG.Println(CLI, "disconnected")
c.persist.Close()
}
}

// internalConnLost cleanup when connection is lost or an error occurs
// Note: This function will not block
func (c *client) internalConnLost(err error) {
// It is possible that internalConnLost will be called multiple times simultaneously
// (including after sending a DisconnectPacket) as such we only do cleanup etc if the
// routines were actually running and are not being disconnected at users request
DEBUG.Println(CLI, "internalConnLost called")
status := atomic.LoadUint32(&c.status)
if status != disconnected && c.stopCommsWorkers() {
DEBUG.Println(CLI, "internalConnLost stopped workers")
if c.options.CleanSession && !c.options.AutoReconnect {
c.messageIds.cleanUp()
}
if c.options.AutoReconnect {
c.setConnected(reconnecting)
go c.reconnect()
} else {
c.setConnected(disconnected)
}
if c.options.OnConnectionLost != nil {
go c.options.OnConnectionLost(c, err)
}
stopDone := c.stopCommsWorkers()
if stopDone != nil { // stopDone will be nil if workers already in the process of stopping or stopped
go func() {
DEBUG.Println(CLI, "internalConnLost waiting on workers")
<-stopDone
DEBUG.Println(CLI, "internalConnLost workers stopped")
if c.options.CleanSession && !c.options.AutoReconnect {
c.messageIds.cleanUp()
}
if c.options.AutoReconnect {
c.setConnected(reconnecting)
go c.reconnect()
} else {
c.setConnected(disconnected)
}
if c.options.OnConnectionLost != nil {
go c.options.OnConnectionLost(c, err)
}
DEBUG.Println(CLI, "internalConnLost complete")
}()
}
DEBUG.Println(CLI, "internalConnLost exiting")
}

// startCommsWorkers is called when the connection is up. It starts off all of the routines needed to process incomming and
// outdoing messages.
// outgoing messages.
// Returns true if the comms workers were started (i.e. they were not already running)
func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packets.ControlPacket) bool {
DEBUG.Println(CLI, "startCommsWorkers called")
Expand Down Expand Up @@ -505,26 +511,29 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet

// c.oboundP and c.obound need to stay active for the life of the client because, depending upon the options,
// messages may be published while the client is disconnected (they will block unless in a goroutine). However
// to keep the comms routines clean we want to shutdown the input messages it uses..
c.commsoboundP = make(chan *PacketAndToken)
c.commsobound = make(chan *PacketAndToken)
// to keep the comms routines clean we want to shutdown the input messages it uses so create out own channels
// and copy data accross.
commsobound := make(chan *PacketAndToken) // outgoing publish packets
commsoboundP := make(chan *PacketAndToken) // outgoing 'priotity' packet
c.workers.Add(1)
go func() {
defer c.workers.Done()
for {
select {
case msg := <-c.oboundP:
c.commsoboundP <- msg
commsoboundP <- msg
case msg := <-c.obound:
c.commsobound <- msg
commsobound <- msg
case <-c.stop:
close(commsoboundP) // Nothing sending to these channels anymore so close them and allow comms routines to exit
close(commsobound)
DEBUG.Println(CLI, "startCommsWorkers output redirector finnished")
return
}
}
}()

commsIncommingPub, commsErrors := startComms(c.conn, c, inboundFromStore, c.commsoboundP, c.commsobound)
commsIncommingPub, commsErrors := startComms(c.conn, c, inboundFromStore, commsoboundP, commsobound)
c.commsStopped = make(chan struct{})
go func() {
for {
Expand All @@ -546,7 +555,7 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet
continue
}
ERROR.Println(CLI, "Connect comms goroutine - error triggered", err)
go c.internalConnLost(err) // no harm in calling this if the connection is already down (better than stopping!)
c.internalConnLost(err) // no harm in calling this if the connection is already down (or shutdown is in progress)
continue
}
}
Expand All @@ -558,16 +567,16 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet
}

// stopWorkersAndComms - Cleanly shuts down worker go routines (including the comms routines) and waits until everything has stopped
// Returns true if the workers were stopped (use as a signal to restart them if needed)
// Returns nil it workers did not need to be stopped; otherwise returns a channel which will be closed when the stop is complete
// Note: This may block so run as a go routine if calling from any of the comms routines
func (c *client) stopCommsWorkers() bool {
func (c *client) stopCommsWorkers() chan struct{} {
DEBUG.Println(CLI, "stopCommsWorkers called")
// It is possible that this function will be called multiple times simultaneously due to the way things get shutdown
c.connMu.Lock()
defer c.connMu.Unlock()
if c.conn == nil {
DEBUG.Println(CLI, "stopCommsWorkers done (not running)")
return false
c.connMu.Unlock()
return nil
}

// It is important that everything is stopped in the correct order to avoid deadlocks. The main issue here is
Expand All @@ -577,21 +586,25 @@ func (c *client) stopCommsWorkers() bool {
// channels which will allow the comms routines to exit.

// We stop all non-comms related workers first (ping, keepalive, errwatch, resume etc) so they don't get blocked waiting on comms
close(c.stop) // Signal for workers to stop
c.conn.Close() // Possible that this is already closed but no harm in closing again
c.conn = nil
close(c.stop) // Signal for workers to stop
c.conn.Close() // Possible that this is already closed but no harm in closing again
c.conn = nil // Important that this is the only place that this is set to nil
c.connMu.Unlock() // As the conection is now nil we can unlock the mu (allowing subsequent calls to exit immediately)

DEBUG.Println(CLI, "stopCommsWorkers waiting for workers")
c.workers.Wait()
doneChan := make(chan struct{})

// As everything relying upon comms is notw stopped we can stop the comms outbound channels
close(c.commsobound)
close(c.commsoboundP)
DEBUG.Println(CLI, "stopCommsWorkers waiting for comms")
<-c.commsStopped // wait for comms routine to stop
go func() {
DEBUG.Println(CLI, "stopCommsWorkers waiting for workers")
c.workers.Wait()

DEBUG.Println(CLI, "stopCommsWorkers done")
return true
// Stopping the workers will allow the comms routines to exit; we wait for these to complete
DEBUG.Println(CLI, "stopCommsWorkers waiting for comms")
<-c.commsStopped // wait for comms routine to stop

DEBUG.Println(CLI, "stopCommsWorkers done")
close(doneChan)
}()
return doneChan
}

// Publish will publish a message with the specified QoS and content
Expand Down Expand Up @@ -826,12 +839,16 @@ func (c *client) reserveStoredPublishIDs() {

// Load all stored messages and resend them
// Call this to ensure QOS > 1,2 even after an application crash
// Note: ibound, c.obound and c.oboundP will be read while this routine is running (guaranteed until after ibound gets closed)
// Note: This function will exit if c.stop is closed (this allows the shutdown to proceed avoiding a potential deadlock)
//
func (c *client) resume(subscription bool, ibound chan packets.ControlPacket) {
DEBUG.Println(STR, "enter Resume")

storedKeys := c.persist.All()
for _, key := range storedKeys {
packet := c.persist.Get(key)
if packet == nil {
DEBUG.Println(STR, fmt.Sprintf("resume found NIL packet (%s)", key))
continue
}
details := packet.Details()
Expand All @@ -845,24 +862,48 @@ func (c *client) resume(subscription bool, ibound chan packets.ControlPacket) {
token.messageID = details.MessageID
token.subs = append(token.subs, subPacket.Topics...)
c.claimID(token, details.MessageID)
c.oboundP <- &PacketAndToken{p: packet, t: token}
select {
case c.oboundP <- &PacketAndToken{p: packet, t: token}:
case <-c.stop:
DEBUG.Println(STR, "resume exiting due to stop")
return
}
} else {
c.persist.Del(key) // Unsubscribe packets should not be retained following a reconnect
}
case *packets.UnsubscribePacket:
if subscription {
DEBUG.Println(STR, fmt.Sprintf("loaded pending unsubscribe (%d)", details.MessageID))
token := newToken(packets.Unsubscribe).(*UnsubscribeToken)
c.oboundP <- &PacketAndToken{p: packet, t: token}
select {
case c.oboundP <- &PacketAndToken{p: packet, t: token}:
case <-c.stop:
DEBUG.Println(STR, "resume exiting due to stop")
return
}
} else {
c.persist.Del(key) // Unsubscribe packets should not be retained following a reconnect
}
case *packets.PubrelPacket:
DEBUG.Println(STR, fmt.Sprintf("loaded pending pubrel (%d)", details.MessageID))
c.oboundP <- &PacketAndToken{p: packet, t: nil}
select {
case c.oboundP <- &PacketAndToken{p: packet, t: nil}:
case <-c.stop:
DEBUG.Println(STR, "resume exiting due to stop")
return
}
case *packets.PublishPacket:
token := newToken(packets.Publish).(*PublishToken)
token.messageID = details.MessageID
c.claimID(token, details.MessageID)
DEBUG.Println(STR, fmt.Sprintf("loaded pending publish (%d)", details.MessageID))
DEBUG.Println(STR, details)
c.obound <- &PacketAndToken{p: packet, t: token}
select {
case c.obound <- &PacketAndToken{p: packet, t: token}:
case <-c.stop:
DEBUG.Println(STR, "resume exiting due to stop")
return
}
default:
ERROR.Println(STR, "invalid message type in store (discarded)")
c.persist.Del(key)
Expand All @@ -871,13 +912,19 @@ func (c *client) resume(subscription bool, ibound chan packets.ControlPacket) {
switch packet.(type) {
case *packets.PubrelPacket:
DEBUG.Println(STR, fmt.Sprintf("loaded pending incomming (%d)", details.MessageID))
ibound <- packet
select {
case ibound <- packet:
case <-c.stop:
DEBUG.Println(STR, "resume exiting due to stop (ibound <- packet)")
return
}
default:
ERROR.Println(STR, "invalid message type in store (discarded)")
c.persist.Del(key)
}
}
}
DEBUG.Println(STR, "exit resume")
}

// Unsubscribe will end the subscription from each of the topics provided.
Expand Down