Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

occasional deadlock when connections are lost #509

Closed
dehort opened this issue May 24, 2021 · 7 comments
Closed

occasional deadlock when connections are lost #509

dehort opened this issue May 24, 2021 · 7 comments

Comments

@dehort
Copy link

dehort commented May 24, 2021

I am seeing an issue where occasionally my paho client will deadlock. When this happens, it will not be able to send or receive messages.

This appears to be triggered when the connection to broker is lost.

This is what I see in the go routine stack trace dump...at a high level:

  • router.matchAndDispatch() is blocked because of m.Ack() ( ackFunc() )...so it cannot read from messages
  • client.startCommsWorkers() go routine 1 ...can't read ackOut (from ackFunc()) because its blocked writing to commsoboundP
  • client.startCommsWorkers() go routine 2 ...can't progress because its blocked on writing to incomingPubChan
  • startOutgoingComms() can't progress because its blocked writing to errChan
  • startComms() can't progress because its waiting to write on outError
  • client.startCommsWorkers() go routine 2 ...can't read commsErrors (outError from startComms()) because its waiting to write to incomingPubChan
  • router.matchAndDispatch() can't read messages (incomingPubChan) because m.Ack() is blocked (ackFunc)

The client doesn't appear to receive any errors from paho when this happens. Eventually the client will see the "publish was broken by timeout" error if it continues to send messages. This makes sense because the publish will not be able to write to obound eventually...so the timeout will get triggered. https://github.com/eclipse/paho.mqtt.golang/blob/master/client.go#L721 This code makes me think the client thinks it is connected.

I will attach the stack trace's that I have gathered.

This appears to be the same issue as #328

@dehort
Copy link
Author

dehort commented May 24, 2021

This is using paho 1.3.1

Stack traces are here:

HANG_01_01.txt
HANG_01_02.txt

These files are from the same run. HANG_01_02.txt was taken a bit later than HANG_01_01.txt.

@dehort
Copy link
Author

dehort commented May 24, 2021

More notes from the stack trace:

ackFunc()

  • blocks writing to oboundP - line 458
    oboundP is ackChan in matchAndDispatch
    oboundP is ackOut in startComsWorkers

stopCommsWorkers()

  • blocks waiting c.workers.Wait() - line 614

internalConnLost

  • calls stopCommsWorkers() - blocked
  • spawns go routine which blocks waiting on stopDone - line 460

startCommsWorkers - go routine 1

  • blocks writing on commsoboundP - line 540
  • can't get down to call c.workers.Done()...
    this hangs stopCommsWorkers which hangs internalConnLost()

startCommsWorkers - go routine 2

  • blocks writing on incomingPubChan - line 567

startOutgoingComms

  • blocks writing on errChan - line 321
  • looks like this is happening when a failure to write on the connection
  • its blocking trying to send an error to errChan
  • this should get processed by startComms() - line 425
    • but this is blocked waiting to write on outErr channel
      • but startCommsWorkers() is blocked waiting to send on incomingPubChan - line 567»
        ... so it can't read from commsError

        • incomingPubChan is blocked because of something in matchAndDispatch
          • matchAndDispatch is blocked because of calling m.Ack()
            • m.Ack() is ackFunc() (see ackFunc() above)
              • ackFunc() is blocked writing to oboundP
                • this is happening because startCommsWorker go routine is blocked writing to commsoboundP
                  ...so it can't get back around to reading ackOut
                  • commsoboundP is passed to startComms()
                    • startComms() passes commsoboundP as oboundp to startOutgoingComms()
                    • startOutgoingComms() is blocked writing to errChan
                    • startComms() is blocked waiting to write to outErr

@MattBrittan
Copy link
Contributor

MattBrittan commented May 24, 2021

Thanks for the detailed notes (this kind of issue can be difficult to duplicate and track down!). I'm assuming you are using OrderMatters = true (the default) because otherwise m.Ack() would be called in a go routine (and I don't believe that this issue would arise).

If I am understanding this correctly what is happening is:

  • Outgoing comms is blocked reporting a comms error reporting the error
    • because startCommsWorkers is blocked by an incoming publish
    • because matchAndDispatch is blocked acknowledging a message - the blockage occurs within ackFunc sending to outboundP
    • because outboundP processing is blocked attempting to send to commsoboundP here (due to c.Stop being closed). The idea of this code is to send acknowledgments when the comms functions are being closed down cleanly.
    • because commsoboundP is blocked in outgoing comms due to the processing go routine's attempt to transmit an error (loop back up to top!)

That being the case I think the best solution will be to modify client.go such that sending to incomingPubChan can be preempted by a receive on commsErrors i.e.

select {
	case pub, ok := <- commsIncomingPub:
		if !ok {
			// Incoming comms has shutdown
			close(incomingPubChan) // stop the router
			commsIncomingPub = nil
			continue
		}
sendPubLoop:
               for {
                   select {
 	   	       case incomingPubChan <- pub:
                          break sendPubLoop
                       case err, ok := <- commsErrors:
                           if !ok {  // commsErrors has been closed so we can ignore it
			      commsErrors = nil
			      continue
		           }
                         ERROR.Println(CLI, "Connect comms goroutine - error triggered", err)
   		         c.internalConnLost(err) // no harm in calling this if the connection is already down (or shutdown is in progress)
                         continue
                  }
               }
	case err, ok := commsErrors:
		if !ok {
			commsErrors = nil
			continue
		}
		ERROR.Println(CLI, "Connect comms goroutine - error triggered", err)
		c.internalConnLost(err) // no harm in calling this if the connection is already down (or shutdown is in progress)
		continue
	}

I'm not really happy with this solution but things have become pretty convoluted in an attempt to maintain backwards compatibility and delivering incoming publish messages seems to be what a user would expect. Thoughts?

Note: The symptoms may be similar to this issue but there have been major changes since that was raised (this commit) so I don't think the two can be compared. A range of deadlock type issues have been raised and those have been fixed where possible but often there has been insufficient information to track them down (or to assess whether the issue was in this library or the users code).

@dehort
Copy link
Author

dehort commented Jun 2, 2021

Hi @MattBrittan, sorry for the delayed response. Yes, this has been difficult to reproduce.

That seems like a reasonable approach.

Thanks!

MattBrittan added a commit to ChIoT-Tech/paho.mqtt.golang that referenced this issue Jun 3, 2021
@MattBrittan
Copy link
Contributor

Sorry for the delays on this; I wanted to write a test that duplicated the issue so I could be confident that the fix worked. This took a while (and the test needs to be run a fair number of times to be sure the issue will arise) but I can now reliably replicate the issue and confirm that the fix works (10,000 iterations of the test ran without issue). I have committed the change so would appreciate it if you could try @master.

@dehort
Copy link
Author

dehort commented Jun 4, 2021

Unfortunately, I have lost access to the misbehaving broker that I was using when I ran into this bug. So I'm not quite sure how to reproduce it anymore.

Thank you for working to resolve this issue. I appreciate it!

@MattBrittan
Copy link
Contributor

Normally I'd leave this open but as @dehort no longer has access to the environment within which it arose there is no way to confirm that the initial issue is actually fixed (I was able to confirm that there was a problem, which is now fixed, but cannot be 100% sure that this was the issue @dehort encountered). Thanks very much for the detailed information provided in this issue; without this I doubt I would have found the issue that has now been fixed (it was in code that I have reviewed a number of times in the past but was difficult to spot because it was due to the interactions of multiple goroutines).

I have just released v1.3.5 which includes the fix.

mihaitodor added a commit to mihaitodor/benthos that referenced this issue Jul 2, 2021
- Disconnect hang: eclipse/paho.mqtt.golang#501
- Occasional deadlock when connections are lost: eclipse/paho.mqtt.golang#509
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants