Skip to content

Commit

Permalink
Merge pull request #470 from ChIoT-Tech/master
Browse files Browse the repository at this point in the history
Resolves potential deadlock in matchAndDispatch
  • Loading branch information
MattBrittan committed Dec 20, 2020
2 parents 72d5136 + 79b446c commit 5695029
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 53 deletions.
45 changes: 29 additions & 16 deletions client.go
Expand Up @@ -206,8 +206,8 @@ func (c *client) setConnected(status uint32) {
atomic.StoreUint32(&c.status, status)
}

//ErrNotConnected is the error returned from function calls that are
//made when the client is not connected to a broker
// ErrNotConnected is the error returned from function calls that are
// made when the client is not connected to a broker
var ErrNotConnected = errors.New("not Connected")

// Connect will create a connection to the message broker, by default
Expand Down Expand Up @@ -438,7 +438,8 @@ func (c *client) forceDisconnect() {
func (c *client) disconnect() {
done := c.stopCommsWorkers()
if done != nil {
<-done // Wait until the disconect is complete (to limit chance that another connection will be started)
<-done // Wait until the disconnect is complete (to limit chance that another connection will be started)
DEBUG.Println(CLI, "forcefully disconnecting")
c.messageIds.cleanUp()
DEBUG.Println(CLI, "disconnected")
c.persist.Close()
Expand Down Expand Up @@ -498,12 +499,11 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet
go keepalive(c, conn)
}

// matchAndDispatch will process messages received from the network. It may generate acknowledgements
// It will complete when incomingPubChan is closed and will close ackOut prior to exiting
incomingPubChan := make(chan *packets.PublishPacket)
c.workers.Add(1)
go func() {
c.msgRouter.matchAndDispatch(incomingPubChan, c.options.Order, c)
c.workers.Done()
}()
c.workers.Add(1) // Done will be called when ackOut is closed
ackOut := c.msgRouter.matchAndDispatch(incomingPubChan, c.options.Order, c)

c.setConnected(connected)
DEBUG.Println(CLI, "client is connected/reconnected")
Expand All @@ -526,7 +526,20 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet
commsoboundP <- msg
case msg := <-c.obound:
commsobound <- msg
case msg, ok := <-ackOut:
if !ok {
ackOut = nil // ignore channel going forward
c.workers.Done() // matchAndDispatch has completed
}
commsoboundP <- msg
case <-c.stop:
// Attempt to transmit any outstanding acknowledgements (this may well fail but should work if this is a clean disconnect)
if ackOut != nil {
for msg := range ackOut {
commsoboundP <- msg
}
c.workers.Done() // matchAndDispatch has completed
}
close(commsoboundP) // Nothing sending to these channels anymore so close them and allow comms routines to exit
close(commsobound)
DEBUG.Println(CLI, "startCommsWorkers output redirector finnished")
Expand All @@ -535,19 +548,19 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet
}
}()

commsIncommingPub, commsErrors := startComms(c.conn, c, inboundFromStore, commsoboundP, commsobound)
commsIncomingPub, commsErrors := startComms(c.conn, c, inboundFromStore, commsoboundP, commsobound)
c.commsStopped = make(chan struct{})
go func() {
for {
if commsIncommingPub == nil && commsErrors == nil {
if commsIncomingPub == nil && commsErrors == nil {
break
}
select {
case pub, ok := <-commsIncommingPub:
case pub, ok := <-commsIncomingPub:
if !ok {
// Incomming comms has shutdown
// Incoming comms has shutdown
close(incomingPubChan) // stop the router
commsIncommingPub = nil
commsIncomingPub = nil
continue
}
incomingPubChan <- pub
Expand All @@ -561,7 +574,7 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet
continue
}
}
DEBUG.Println(CLI, "comms goroutine done")
DEBUG.Println(CLI, "incoming comms goroutine done")
close(c.commsStopped)
}()
DEBUG.Println(CLI, "startCommsWorkers done")
Expand Down Expand Up @@ -999,8 +1012,8 @@ func (c *client) OptionsReader() ClientOptionsReader {
return r
}

//DefaultConnectionLostHandler is a definition of a function that simply
//reports to the DEBUG log the reason for the client losing a connection.
// DefaultConnectionLostHandler is a definition of a function that simply
// reports to the DEBUG log the reason for the client losing a connection.
func DefaultConnectionLostHandler(client Client, reason error) {
DEBUG.Println("Connection lost:", reason.Error())
}
Expand Down
79 changes: 42 additions & 37 deletions router.go
Expand Up @@ -131,47 +131,52 @@ func (r *router) setDefaultHandler(handler MessageHandler) {
// takes messages off the channel, matches them against the internal route list and calls the
// associated callback (or the defaultHandler, if one exists and no other route matched). If
// anything is sent down the stop channel the function will end.
func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) {
for message := range messages {
// DEBUG.Println(ROU, "matchAndDispatch received message")
sent := false
r.RLock()
m := messageFromPublish(message, ackFunc(client.oboundP, client.persist, message))
handlers := []MessageHandler{}
for e := r.routes.Front(); e != nil; e = e.Next() {
if e.Value.(*route).match(message.TopicName) {
if order {
handlers = append(handlers, e.Value.(*route).callback)
} else {
hd := e.Value.(*route).callback
go func() {
hd(client, m)
m.Ack()
}()
func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) <-chan *PacketAndToken {
ackChan := make(chan *PacketAndToken)
go func() {
for message := range messages {
// DEBUG.Println(ROU, "matchAndDispatch received message")
sent := false
r.RLock()
m := messageFromPublish(message, ackFunc(ackChan, client.persist, message))
handlers := []MessageHandler{}
for e := r.routes.Front(); e != nil; e = e.Next() {
if e.Value.(*route).match(message.TopicName) {
if order {
handlers = append(handlers, e.Value.(*route).callback)
} else {
hd := e.Value.(*route).callback
go func() {
hd(client, m)
m.Ack()
}()
}
sent = true
}
sent = true
}
}
if !sent {
if r.defaultHandler != nil {
if order {
handlers = append(handlers, r.defaultHandler)
if !sent {
if r.defaultHandler != nil {
if order {
handlers = append(handlers, r.defaultHandler)
} else {
go func() {
r.defaultHandler(client, m)
m.Ack()
}()
}
} else {
go func() {
r.defaultHandler(client, m)
m.Ack()
}()
DEBUG.Println(ROU, "matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.")
}
} else {
DEBUG.Println(ROU, "matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.")
}
r.RUnlock()
for _, handler := range handlers {
handler(client, m)
m.Ack()
}
// DEBUG.Println(ROU, "matchAndDispatch handled message")
}
r.RUnlock()
for _, handler := range handlers {
handler(client, m)
m.Ack()
}
// DEBUG.Println(ROU, "matchAndDispatch handled message")
}
DEBUG.Println(ROU, "matchAndDispatch exiting")
close(ackChan)
DEBUG.Println(ROU, "matchAndDispatch exiting")
}()
return ackChan
}

0 comments on commit 5695029

Please sign in to comment.