Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve send on closed channel when order=false and connection closed. #508

Merged
merged 1 commit into from May 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
64 changes: 64 additions & 0 deletions fvt_client_test.go
Expand Up @@ -926,6 +926,70 @@ func Test_PublishEmptyMessage(t *testing.T) {
s.Disconnect(250)
}

// Test_CallbackOverrun - When ordermatters=false the callbacks are called within a go routine. It is possible that
// the connection will drop before the handler completes and this should result in the ACK being dropped silently
// (leads to a panic in v1.3-v1.3.4)
func Test_CallbackOverrun(t *testing.T) {
topic := "/test/callbackoverrun"
handlerCalled := make(chan bool)
handlerChoke := make(chan bool)
handlerError := make(chan error)

pops := NewClientOptions()
pops.AddBroker(FVTTCP)
pops.SetOrderMatters(false) // Not really needed but consistent...
pops.SetClientID("callbackoverrun-pub")
p := NewClient(pops)

sops := NewClientOptions()
sops.AddBroker(FVTTCP)
sops.SetOrderMatters(false)
sops.SetClientID("callbackoverrun-sub")
var f MessageHandler = func(client Client, msg Message) {
handlerCalled <- true
<-handlerChoke // Wait until connection has been closed
if string(msg.Payload()) != "test message" {
handlerError <- fmt.Errorf("Message payload incorrect")
} else {
handlerError <- nil // Allow main test to proceed (should not raise error in go routine)
}
}

s := NewClient(sops).(*client)
if sToken := s.Connect(); sToken.Wait() && sToken.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", sToken.Error())
}

if sToken := s.Subscribe(topic, 1, f); sToken.Wait() && sToken.Error() != nil {
t.Fatalf("Error on Client.Subscribe(): %v", sToken.Error())
}

if pToken := p.Connect(); pToken.Wait() && pToken.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", pToken.Error())
}

p.Publish(topic, 1, false, "test message")
wait(handlerCalled) // Wait until the handler has been called
s.Disconnect(250) // Ensure the connection is dropped
<-s.commsStopped // Double check...
handlerChoke <- true // Allow handler to proceed

err := <-handlerError
if err != nil {
t.Fatalf(err.Error())
}

time.Sleep(time.Microsecond) // Allow a little time in case the handler returning after connection dropped causes an issue (panic)
fmt.Println("reconnecting")
// Now attempt to reconnect (checking for blockages)
if sToken := s.Connect(); sToken.Wait() && sToken.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", sToken.Error())
}

s.Disconnect(250)
p.Disconnect(250)
}

// func Test_Cleanstore(t *testing.T) {
// store := "/tmp/fvt/cleanstore"
// topic := "/test/cleanstore"
Expand Down
57 changes: 52 additions & 5 deletions router.go
Expand Up @@ -132,23 +132,58 @@ func (r *router) setDefaultHandler(handler MessageHandler) {
// associated callback (or the defaultHandler, if one exists and no other route matched). If
// anything is sent down the stop channel the function will end.
func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) <-chan *PacketAndToken {
ackChan := make(chan *PacketAndToken)
go func() {
var wg sync.WaitGroup
ackOutChan := make(chan *PacketAndToken) // Channel returned to caller; closed when messages channel closed
var ackInChan chan *PacketAndToken // ACKs generated by ackFunc get put onto this channel

stopAckCopy := make(chan struct{}) // Closure requests stop of go routine copying ackInChan to ackOutChan
ackCopyStopped := make(chan struct{}) // Closure indicates that it is safe to close ackOutChan
goRoutinesDone := make(chan struct{}) // closed on wg.Done()
if order {
ackInChan = ackOutChan // When order = true no go routines are used so safe to use one channel and close when done
} else {
// When order = false ACK messages are sent in go routines so ackInChan cannot be closed until all goroutines done
ackInChan = make(chan *PacketAndToken)
go func() { // go routine to copy from ackInChan to ackOutChan until stopped
for {
select {
case a := <-ackInChan:
ackOutChan <- a
case <-stopAckCopy:
close(ackCopyStopped) // Signal main go routine that it is safe to close ackOutChan
for {
select {
case <-ackInChan: // drain ackInChan to ensure all goRoutines can complete cleanly (ACK dropped)
DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).")
case <-goRoutinesDone:
close(ackInChan) // Nothing further should be sent (a panic is probably better than silent failure)
DEBUG.Println(ROU, "matchAndDispatch order=false copy goroutine exiting.")
return
}
}
}
}
}()
}

go func() { // Main go routine handling inbound messages
for message := range messages {
// DEBUG.Println(ROU, "matchAndDispatch received message")
sent := false
r.RLock()
m := messageFromPublish(message, ackFunc(ackChan, client.persist, message))
m := messageFromPublish(message, ackFunc(ackInChan, client.persist, message))
var handlers []MessageHandler
for e := r.routes.Front(); e != nil; e = e.Next() {
if e.Value.(*route).match(message.TopicName) {
if order {
handlers = append(handlers, e.Value.(*route).callback)
} else {
hd := e.Value.(*route).callback
wg.Add(1)
go func() {
hd(client, m)
m.Ack()
wg.Done()
}()
}
sent = true
Expand All @@ -159,9 +194,11 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
if order {
handlers = append(handlers, r.defaultHandler)
} else {
wg.Add(1)
go func() {
r.defaultHandler(client, m)
m.Ack()
wg.Done()
}()
}
} else {
Expand All @@ -175,8 +212,18 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
}
// DEBUG.Println(ROU, "matchAndDispatch handled message")
}
close(ackChan)
if order {
close(ackOutChan)
} else { // Ensure that nothing further will be written to ackOutChan before closing it
close(stopAckCopy)
<-ackCopyStopped
close(ackOutChan)
go func() {
wg.Wait() // Note: If this remains running then the user has handlers that are not returning
close(goRoutinesDone)
}()
}
DEBUG.Println(ROU, "matchAndDispatch exiting")
}()
return ackChan
return ackOutChan
}