Skip to content

Commit

Permalink
Merge pull request #434 from ChIoT-Tech/deadlock-on-resume
Browse files Browse the repository at this point in the history
Resolve potential deadlocks on Resume/Error
  • Loading branch information
Al S-M committed Sep 18, 2020
2 parents ca94c53 + 6a43541 commit 9e9179e
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 106 deletions.
157 changes: 102 additions & 55 deletions client.go
Expand Up @@ -115,11 +115,9 @@ type client struct {
conn net.Conn // the network connection, must only be set with connMu locked (only used when starting/stopping workers)
connMu sync.Mutex // mutex for the connection (again only used in two functions)

stop chan struct{} // Closed to request that workers stop
workers sync.WaitGroup // used to wait for workers to complete (ping, keepalive, errwatch, resume)
commsStopped chan struct{} // closed when the comms routines have stopped (kept running until after workers have closed to avoid deadlocks)
commsobound chan *PacketAndToken // outgoing publish packets serviced by active comms go routines (maintains compatibility)
commsoboundP chan *PacketAndToken // outgoing 'priotity' packet serviced by active comms go routines (maintains compatibility)
stop chan struct{} // Closed to request that workers stop
workers sync.WaitGroup // used to wait for workers to complete (ping, keepalive, errwatch, resume)
commsStopped chan struct{} // closed when the comms routines have stopped (kept running until after workers have closed to avoid deadlocks)
}

// NewClient will create an MQTT v3.1.1 client with all of the options specified
Expand Down Expand Up @@ -436,39 +434,47 @@ func (c *client) forceDisconnect() {

// disconnect cleans up after a final disconnection (user requested so no auto reconnection)
func (c *client) disconnect() {
c.stopCommsWorkers()
c.messageIds.cleanUp()
DEBUG.Println(CLI, "disconnected")
c.persist.Close()
done := c.stopCommsWorkers()
if done != nil {
<-done // Wait until the disconect is complete (to limit chance that another connection will be started)
c.messageIds.cleanUp()
DEBUG.Println(CLI, "disconnected")
c.persist.Close()
}
}

// internalConnLost cleanup when connection is lost or an error occurs
// Note: This function will not block
func (c *client) internalConnLost(err error) {
// It is possible that internalConnLost will be called multiple times simultaneously
// (including after sending a DisconnectPacket) as such we only do cleanup etc if the
// routines were actually running and are not being disconnected at users request
DEBUG.Println(CLI, "internalConnLost called")
status := atomic.LoadUint32(&c.status)
if status != disconnected && c.stopCommsWorkers() {
DEBUG.Println(CLI, "internalConnLost stopped workers")
if c.options.CleanSession && !c.options.AutoReconnect {
c.messageIds.cleanUp()
}
if c.options.AutoReconnect {
c.setConnected(reconnecting)
go c.reconnect()
} else {
c.setConnected(disconnected)
}
if c.options.OnConnectionLost != nil {
go c.options.OnConnectionLost(c, err)
}
stopDone := c.stopCommsWorkers()
if stopDone != nil { // stopDone will be nil if workers already in the process of stopping or stopped
go func() {
DEBUG.Println(CLI, "internalConnLost waiting on workers")
<-stopDone
DEBUG.Println(CLI, "internalConnLost workers stopped")
if c.options.CleanSession && !c.options.AutoReconnect {
c.messageIds.cleanUp()
}
if c.options.AutoReconnect {
c.setConnected(reconnecting)
go c.reconnect()
} else {
c.setConnected(disconnected)
}
if c.options.OnConnectionLost != nil {
go c.options.OnConnectionLost(c, err)
}
DEBUG.Println(CLI, "internalConnLost complete")
}()
}
DEBUG.Println(CLI, "internalConnLost exiting")
}

// startCommsWorkers is called when the connection is up. It starts off all of the routines needed to process incomming and
// outdoing messages.
// outgoing messages.
// Returns true if the comms workers were started (i.e. they were not already running)
func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packets.ControlPacket) bool {
DEBUG.Println(CLI, "startCommsWorkers called")
Expand Down Expand Up @@ -505,26 +511,29 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet

// c.oboundP and c.obound need to stay active for the life of the client because, depending upon the options,
// messages may be published while the client is disconnected (they will block unless in a goroutine). However
// to keep the comms routines clean we want to shutdown the input messages it uses..
c.commsoboundP = make(chan *PacketAndToken)
c.commsobound = make(chan *PacketAndToken)
// to keep the comms routines clean we want to shutdown the input messages it uses so create out own channels
// and copy data accross.
commsobound := make(chan *PacketAndToken) // outgoing publish packets
commsoboundP := make(chan *PacketAndToken) // outgoing 'priotity' packet
c.workers.Add(1)
go func() {
defer c.workers.Done()
for {
select {
case msg := <-c.oboundP:
c.commsoboundP <- msg
commsoboundP <- msg
case msg := <-c.obound:
c.commsobound <- msg
commsobound <- msg
case <-c.stop:
close(commsoboundP) // Nothing sending to these channels anymore so close them and allow comms routines to exit
close(commsobound)
DEBUG.Println(CLI, "startCommsWorkers output redirector finnished")
return
}
}
}()

commsIncommingPub, commsErrors := startComms(c.conn, c, inboundFromStore, c.commsoboundP, c.commsobound)
commsIncommingPub, commsErrors := startComms(c.conn, c, inboundFromStore, commsoboundP, commsobound)
c.commsStopped = make(chan struct{})
go func() {
for {
Expand All @@ -546,7 +555,7 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet
continue
}
ERROR.Println(CLI, "Connect comms goroutine - error triggered", err)
go c.internalConnLost(err) // no harm in calling this if the connection is already down (better than stopping!)
c.internalConnLost(err) // no harm in calling this if the connection is already down (or shutdown is in progress)
continue
}
}
Expand All @@ -558,16 +567,16 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet
}

// stopWorkersAndComms - Cleanly shuts down worker go routines (including the comms routines) and waits until everything has stopped
// Returns true if the workers were stopped (use as a signal to restart them if needed)
// Returns nil it workers did not need to be stopped; otherwise returns a channel which will be closed when the stop is complete
// Note: This may block so run as a go routine if calling from any of the comms routines
func (c *client) stopCommsWorkers() bool {
func (c *client) stopCommsWorkers() chan struct{} {
DEBUG.Println(CLI, "stopCommsWorkers called")
// It is possible that this function will be called multiple times simultaneously due to the way things get shutdown
c.connMu.Lock()
defer c.connMu.Unlock()
if c.conn == nil {
DEBUG.Println(CLI, "stopCommsWorkers done (not running)")
return false
c.connMu.Unlock()
return nil
}

// It is important that everything is stopped in the correct order to avoid deadlocks. The main issue here is
Expand All @@ -577,21 +586,25 @@ func (c *client) stopCommsWorkers() bool {
// channels which will allow the comms routines to exit.

// We stop all non-comms related workers first (ping, keepalive, errwatch, resume etc) so they don't get blocked waiting on comms
close(c.stop) // Signal for workers to stop
c.conn.Close() // Possible that this is already closed but no harm in closing again
c.conn = nil
close(c.stop) // Signal for workers to stop
c.conn.Close() // Possible that this is already closed but no harm in closing again
c.conn = nil // Important that this is the only place that this is set to nil
c.connMu.Unlock() // As the conection is now nil we can unlock the mu (allowing subsequent calls to exit immediately)

DEBUG.Println(CLI, "stopCommsWorkers waiting for workers")
c.workers.Wait()
doneChan := make(chan struct{})

// As everything relying upon comms is notw stopped we can stop the comms outbound channels
close(c.commsobound)
close(c.commsoboundP)
DEBUG.Println(CLI, "stopCommsWorkers waiting for comms")
<-c.commsStopped // wait for comms routine to stop
go func() {
DEBUG.Println(CLI, "stopCommsWorkers waiting for workers")
c.workers.Wait()

DEBUG.Println(CLI, "stopCommsWorkers done")
return true
// Stopping the workers will allow the comms routines to exit; we wait for these to complete
DEBUG.Println(CLI, "stopCommsWorkers waiting for comms")
<-c.commsStopped // wait for comms routine to stop

DEBUG.Println(CLI, "stopCommsWorkers done")
close(doneChan)
}()
return doneChan
}

// Publish will publish a message with the specified QoS and content
Expand Down Expand Up @@ -826,12 +839,16 @@ func (c *client) reserveStoredPublishIDs() {

// Load all stored messages and resend them
// Call this to ensure QOS > 1,2 even after an application crash
// Note: ibound, c.obound and c.oboundP will be read while this routine is running (guaranteed until after ibound gets closed)
// Note: This function will exit if c.stop is closed (this allows the shutdown to proceed avoiding a potential deadlock)
//
func (c *client) resume(subscription bool, ibound chan packets.ControlPacket) {
DEBUG.Println(STR, "enter Resume")

storedKeys := c.persist.All()
for _, key := range storedKeys {
packet := c.persist.Get(key)
if packet == nil {
DEBUG.Println(STR, fmt.Sprintf("resume found NIL packet (%s)", key))
continue
}
details := packet.Details()
Expand All @@ -845,24 +862,48 @@ func (c *client) resume(subscription bool, ibound chan packets.ControlPacket) {
token.messageID = details.MessageID
token.subs = append(token.subs, subPacket.Topics...)
c.claimID(token, details.MessageID)
c.oboundP <- &PacketAndToken{p: packet, t: token}
select {
case c.oboundP <- &PacketAndToken{p: packet, t: token}:
case <-c.stop:
DEBUG.Println(STR, "resume exiting due to stop")
return
}
} else {
c.persist.Del(key) // Unsubscribe packets should not be retained following a reconnect
}
case *packets.UnsubscribePacket:
if subscription {
DEBUG.Println(STR, fmt.Sprintf("loaded pending unsubscribe (%d)", details.MessageID))
token := newToken(packets.Unsubscribe).(*UnsubscribeToken)
c.oboundP <- &PacketAndToken{p: packet, t: token}
select {
case c.oboundP <- &PacketAndToken{p: packet, t: token}:
case <-c.stop:
DEBUG.Println(STR, "resume exiting due to stop")
return
}
} else {
c.persist.Del(key) // Unsubscribe packets should not be retained following a reconnect
}
case *packets.PubrelPacket:
DEBUG.Println(STR, fmt.Sprintf("loaded pending pubrel (%d)", details.MessageID))
c.oboundP <- &PacketAndToken{p: packet, t: nil}
select {
case c.oboundP <- &PacketAndToken{p: packet, t: nil}:
case <-c.stop:
DEBUG.Println(STR, "resume exiting due to stop")
return
}
case *packets.PublishPacket:
token := newToken(packets.Publish).(*PublishToken)
token.messageID = details.MessageID
c.claimID(token, details.MessageID)
DEBUG.Println(STR, fmt.Sprintf("loaded pending publish (%d)", details.MessageID))
DEBUG.Println(STR, details)
c.obound <- &PacketAndToken{p: packet, t: token}
select {
case c.obound <- &PacketAndToken{p: packet, t: token}:
case <-c.stop:
DEBUG.Println(STR, "resume exiting due to stop")
return
}
default:
ERROR.Println(STR, "invalid message type in store (discarded)")
c.persist.Del(key)
Expand All @@ -871,13 +912,19 @@ func (c *client) resume(subscription bool, ibound chan packets.ControlPacket) {
switch packet.(type) {
case *packets.PubrelPacket:
DEBUG.Println(STR, fmt.Sprintf("loaded pending incomming (%d)", details.MessageID))
ibound <- packet
select {
case ibound <- packet:
case <-c.stop:
DEBUG.Println(STR, "resume exiting due to stop (ibound <- packet)")
return
}
default:
ERROR.Println(STR, "invalid message type in store (discarded)")
c.persist.Del(key)
}
}
}
DEBUG.Println(STR, "exit resume")
}

// Unsubscribe will end the subscription from each of the topics provided.
Expand Down

0 comments on commit 9e9179e

Please sign in to comment.