Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Potential deadlock in matchAndDispatch #470

Merged
merged 1 commit into from Dec 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
45 changes: 29 additions & 16 deletions client.go
Expand Up @@ -206,8 +206,8 @@ func (c *client) setConnected(status uint32) {
atomic.StoreUint32(&c.status, status)
}

//ErrNotConnected is the error returned from function calls that are
//made when the client is not connected to a broker
// ErrNotConnected is the error returned from function calls that are
// made when the client is not connected to a broker
var ErrNotConnected = errors.New("not Connected")

// Connect will create a connection to the message broker, by default
Expand Down Expand Up @@ -438,7 +438,8 @@ func (c *client) forceDisconnect() {
func (c *client) disconnect() {
done := c.stopCommsWorkers()
if done != nil {
<-done // Wait until the disconect is complete (to limit chance that another connection will be started)
<-done // Wait until the disconnect is complete (to limit chance that another connection will be started)
DEBUG.Println(CLI, "forcefully disconnecting")
c.messageIds.cleanUp()
DEBUG.Println(CLI, "disconnected")
c.persist.Close()
Expand Down Expand Up @@ -498,12 +499,11 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet
go keepalive(c, conn)
}

// matchAndDispatch will process messages received from the network. It may generate acknowledgements
// It will complete when incomingPubChan is closed and will close ackOut prior to exiting
incomingPubChan := make(chan *packets.PublishPacket)
c.workers.Add(1)
go func() {
c.msgRouter.matchAndDispatch(incomingPubChan, c.options.Order, c)
c.workers.Done()
}()
c.workers.Add(1) // Done will be called when ackOut is closed
ackOut := c.msgRouter.matchAndDispatch(incomingPubChan, c.options.Order, c)

c.setConnected(connected)
DEBUG.Println(CLI, "client is connected/reconnected")
Expand All @@ -526,7 +526,20 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet
commsoboundP <- msg
case msg := <-c.obound:
commsobound <- msg
case msg, ok := <-ackOut:
if !ok {
ackOut = nil // ignore channel going forward
c.workers.Done() // matchAndDispatch has completed
}
commsoboundP <- msg
case <-c.stop:
// Attempt to transmit any outstanding acknowledgements (this may well fail but should work if this is a clean disconnect)
if ackOut != nil {
for msg := range ackOut {
commsoboundP <- msg
}
c.workers.Done() // matchAndDispatch has completed
}
close(commsoboundP) // Nothing sending to these channels anymore so close them and allow comms routines to exit
close(commsobound)
DEBUG.Println(CLI, "startCommsWorkers output redirector finnished")
Expand All @@ -535,19 +548,19 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet
}
}()

commsIncommingPub, commsErrors := startComms(c.conn, c, inboundFromStore, commsoboundP, commsobound)
commsIncomingPub, commsErrors := startComms(c.conn, c, inboundFromStore, commsoboundP, commsobound)
c.commsStopped = make(chan struct{})
go func() {
for {
if commsIncommingPub == nil && commsErrors == nil {
if commsIncomingPub == nil && commsErrors == nil {
break
}
select {
case pub, ok := <-commsIncommingPub:
case pub, ok := <-commsIncomingPub:
if !ok {
// Incomming comms has shutdown
// Incoming comms has shutdown
close(incomingPubChan) // stop the router
commsIncommingPub = nil
commsIncomingPub = nil
continue
}
incomingPubChan <- pub
Expand All @@ -561,7 +574,7 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet
continue
}
}
DEBUG.Println(CLI, "comms goroutine done")
DEBUG.Println(CLI, "incoming comms goroutine done")
close(c.commsStopped)
}()
DEBUG.Println(CLI, "startCommsWorkers done")
Expand Down Expand Up @@ -999,8 +1012,8 @@ func (c *client) OptionsReader() ClientOptionsReader {
return r
}

//DefaultConnectionLostHandler is a definition of a function that simply
//reports to the DEBUG log the reason for the client losing a connection.
// DefaultConnectionLostHandler is a definition of a function that simply
// reports to the DEBUG log the reason for the client losing a connection.
func DefaultConnectionLostHandler(client Client, reason error) {
DEBUG.Println("Connection lost:", reason.Error())
}
Expand Down
79 changes: 42 additions & 37 deletions router.go
Expand Up @@ -131,47 +131,52 @@ func (r *router) setDefaultHandler(handler MessageHandler) {
// takes messages off the channel, matches them against the internal route list and calls the
// associated callback (or the defaultHandler, if one exists and no other route matched). If
// anything is sent down the stop channel the function will end.
func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) {
for message := range messages {
// DEBUG.Println(ROU, "matchAndDispatch received message")
sent := false
r.RLock()
m := messageFromPublish(message, ackFunc(client.oboundP, client.persist, message))
handlers := []MessageHandler{}
for e := r.routes.Front(); e != nil; e = e.Next() {
if e.Value.(*route).match(message.TopicName) {
if order {
handlers = append(handlers, e.Value.(*route).callback)
} else {
hd := e.Value.(*route).callback
go func() {
hd(client, m)
m.Ack()
}()
func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) <-chan *PacketAndToken {
ackChan := make(chan *PacketAndToken)
go func() {
for message := range messages {
// DEBUG.Println(ROU, "matchAndDispatch received message")
sent := false
r.RLock()
m := messageFromPublish(message, ackFunc(ackChan, client.persist, message))
handlers := []MessageHandler{}
for e := r.routes.Front(); e != nil; e = e.Next() {
if e.Value.(*route).match(message.TopicName) {
if order {
handlers = append(handlers, e.Value.(*route).callback)
} else {
hd := e.Value.(*route).callback
go func() {
hd(client, m)
m.Ack()
}()
}
sent = true
}
sent = true
}
}
if !sent {
if r.defaultHandler != nil {
if order {
handlers = append(handlers, r.defaultHandler)
if !sent {
if r.defaultHandler != nil {
if order {
handlers = append(handlers, r.defaultHandler)
} else {
go func() {
r.defaultHandler(client, m)
m.Ack()
}()
}
} else {
go func() {
r.defaultHandler(client, m)
m.Ack()
}()
DEBUG.Println(ROU, "matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.")
}
} else {
DEBUG.Println(ROU, "matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.")
}
r.RUnlock()
for _, handler := range handlers {
handler(client, m)
m.Ack()
}
// DEBUG.Println(ROU, "matchAndDispatch handled message")
}
r.RUnlock()
for _, handler := range handlers {
handler(client, m)
m.Ack()
}
// DEBUG.Println(ROU, "matchAndDispatch handled message")
}
DEBUG.Println(ROU, "matchAndDispatch exiting")
close(ackChan)
DEBUG.Println(ROU, "matchAndDispatch exiting")
}()
return ackChan
}