Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stops receiving messages #495

Closed
dimonzozo opened this issue Apr 17, 2021 · 9 comments
Closed

Stops receiving messages #495

dimonzozo opened this issue Apr 17, 2021 · 9 comments
Labels
Details Required Further information (logs etc) is required before this issue can be investigated

Comments

@dimonzozo
Copy link

dimonzozo commented Apr 17, 2021

Hi!

I was able to reproduce and capture debug log of problem with failed reconnect (version 1.3.1). When this happens client just silently stops receiving any messages.

Here how it looks like in debug log:

11:20:29 [client]   sending publish message, topic: from/test [D]
11:20:29 [net]      logic waiting for msg on ibound [D]
11:20:29 [net]      startIncomingComms: got msg on ibound [D]
11:20:29 [net]      startIncomingComms: received publish, msgId: 836 [D]
11:20:29 [net]      startIncoming Received Message [D]
11:20:29 [net]      obound msg to write 9189 [D]
11:20:29 [net]      obound wrote msg, id: 9189 [D]
11:20:29 [net]      outgoing waiting for an outbound message [D]
11:20:29 [net]      obound priority msg to write, type *packets.PubackPacket [D]
11:20:29 [net]      outgoing waiting for an outbound message [D]
11:20:29 [pinger]   ping check 0.527490908 [D]
11:20:29 [pinger]   pingresp not received, disconnecting [C]
11:20:29 [client]   internalConnLost called [D]
11:20:29 [client]   stopCommsWorkers called [D]
11:20:29 [client]   internalConnLost waiting on workers [D]
11:20:29 [client]   stopCommsWorkers waiting for workers [D]
11:20:30 [pinger]   ping check 1.600033445 [D]
11:20:30 [net]      startIncoming Received Message [D]
11:20:30 [net]      startIncomingComms: got msg on ibound [D]
11:20:30 [net]      startIncomingComms: received puback, id: 9189 [D]
11:20:30 [net]      logic waiting for msg on ibound [D]
11:20:30 [net]      putting puback msg on obound [D]
11:20:30 [net]      done putting puback msg on obound [D]
11:20:30 [client]   enter Publish [D]
11:20:30 [client]   sending publish message, topic: from/test [D]
11:20:30 [net]      obound msg to write 9190 [D]
11:20:30 [net]      obound wrote msg, id: 9190 [D]
11:20:30 [net]      outgoing waiting for an outbound message [D]
11:20:30 [net]      outgoing waiting for an outbound message [D]
11:20:30 [net]      obound priority msg to write, type *packets.PubackPacket [D]
11:20:30 [net]      outgoing oboundp reporting error  write tcp 127.0.0.1:37424->127.0.0.1:1883: use of closed network connection [E]
11:20:30 [net]      outgoing waiting for an outbound message [D]
11:20:30 [net]      logic waiting for msg on ibound [D]
11:20:30 [net]      startIncomingComms: got msg on ibound [D]
11:20:30 [net]      startIncomingComms: received publish, msgId: 837 [D]
11:20:30 [net]      incoming complete [D]
11:20:31 [net]      startIncoming Received Message [D]
11:20:31 [net]      startIncomingComms: got msg on ibound [D]
11:20:31 [net]      startIncomingComms: received puback, id: 9190 [D]
11:20:31 [net]      logic waiting for msg on ibound [D]
11:20:31 [net]      putting puback msg on obound [D]
11:20:31 [net]      done putting puback msg on obound [D]
11:20:31 [client]   enter Publish [D]
11:20:31 [client]   sending publish message, topic: from/test [D]
11:20:31 [net]      obound msg to write 9191 [D]
11:20:31 [net]      logic waiting for msg on ibound [D]
11:20:31 [net]      startIncomingComms: ibound complete [D]
11:20:31 [net]      obound wrote msg, id: 9191 [D]
11:20:31 [net]      outgoing waiting for an outbound message [D]
11:20:31 [net]      obound priority msg to write, type *packets.PubackPacket [D]
11:20:31 [net]      outgoing oboundp reporting error  write tcp 127.0.0.1:37424->127.0.0.1:1883: use of closed network connection [E]
11:20:31 [net]      startIncomingComms goroutine complete [D]
11:20:32 [net]      startIncoming Received Message [D]
11:20:32 [net]      startIncomingComms: got msg on ibound [D]
11:20:32 [net]      startIncomingComms: received puback, id: 9191 [D]
11:20:32 [net]      logic waiting for msg on ibound [D]
11:20:32 [net]      putting puback msg on obound [D]
11:20:32 [net]      done putting puback msg on obound [D]
11:20:32 [client]   enter Publish [D]
11:20:32 [client]   sending publish message, topic: from/test [D]
11:20:32 [net]      obound msg to write 9192 [D]
11:20:32 [net]      obound wrote msg, id: 9192 [D]
11:20:32 [net]      outgoing waiting for an outbound message [D]
11:20:32 [pinger]   ping check 0.50928487 [D]
11:20:32 [net]      startIncoming Received Message [D]
11:20:32 [net]      startIncomingComms: got msg on ibound [D]
11:20:32 [net]      startIncomingComms: received puback, id: 9192 [D]
11:20:32 [net]      logic waiting for msg on ibound [D]
11:20:32 [net]      putting puback msg on obound [D]
11:20:34 [pinger]   ping check 2.509735274 [D]
11:20:36 [pinger]   ping check 4.508841563 [D]
11:20:38 [pinger]   ping check 6.509643649 [D]
11:20:38 [pinger]   keepalive sending ping [D]
11:20:39 [net]      startIncoming Received Message [D]
11:20:39 [net]      startIncomingComms: got msg on ibound [D]
11:20:39 [net]      startIncomingComms: received pingresp [D]
11:20:39 [net]      logic waiting for msg on ibound [D]
11:20:40 [pinger]   ping check 1.999368095 [D]
11:20:42 [pinger]   ping check 3.999835511 [D]
11:20:44 [pinger]   ping check 6.000838285 [D]
11:20:44 [pinger]   keepalive sending ping [D]
11:20:45 [net]      startIncoming Received Message [D]
11:20:45 [net]      startIncomingComms: got msg on ibound [D]
11:20:45 [net]      startIncomingComms: received pingresp [D]
11:20:45 [net]      logic waiting for msg on ibound [D]
11:20:46 [pinger]   ping check 1.99878406 [D]
11:20:48 [pinger]   ping check 3.99971011 [D]
11:20:50 [pinger]   ping check 5.998825467 [D]
11:20:50 [pinger]   keepalive sending ping [D]
11:20:51 [net]      startIncoming Received Message [D]
11:20:51 [net]      startIncomingComms: got msg on ibound [D]
11:20:51 [net]      startIncomingComms: received pingresp [D]
11:20:51 [net]      logic waiting for msg on ibound [D]

I spent some time to try to figure out what's going on and I think the issue is following:

  • stopCommsWorkers waiting for workers to stop after ping response timeout and entering disconnecting state
  • workers ignores use of closed network connection errors, because disconnecting is in progress
  • goroutine is not closing commsStopped channel and everything got stuck in deadlock

Any fix or workaround for this will be appreciated.

Thanks!

@dimonzozo
Copy link
Author

79b446c seems to be related. I'll try to update to 1.3.3.

@dimonzozo
Copy link
Author

The issue reproduced on 1.3.3.

@MattBrittan
Copy link
Contributor

MattBrittan commented Apr 21, 2021

I spent some time to try to figure out what's going on and I think the issue is following:

  • stopCommsWorkers waiting for workers to stop after ping response timeout and entering disconnecting state workers ignores use of closed network connection errors, because disconnecting is in progress
  • goroutine is not closing commsStopped channel and everything got stuck in deadlock

The goroutine running under startIncoming does not "ignore use of closed network". It just does not send an error when this occurs (the goroutine still exits here). This is deliberate because the only place the connection is closed is in stopCommsWorkers so the error is expected (and logging it adds noise).

The same applies to startOutgoingComms (we need to drain the obound channel so keep reading until we detect its closed; this should happen elsewhere). The interactions between the various go-routines are pretty complex and a lot of work has gone into looking for potential deadlocks (but it's always possible that one has been missed).

I'm finding your log quite confusing; e.g.:

11:20:31 [net]      startIncomingComms goroutine complete [D]
11:20:32 [net]      startIncomingComms: got msg on ibound [D]

When "startIncomingComms goroutine complete" is logged the next operation is a return which means startIncomingComms: got msg on ibound (within the same function) should not happen. I suspect you have multiple connections open?. If so that will make debugging very difficult because there is nothing to differentiate them within the log. It's also possible that something weird is going on before the start of your log segment (I'm thinking the package might open a second connection somehow, cannot work out how, but it may be something I'm missing).

Can you please share some more information on your code (connection options, handlers etc). Unfortunately these issues can be very difficult to diagnose and the cause is often outside of this library (e.g. a MessageHandlerlocking and not returning is the most common cause I see; an easy way to ensure this is not the case here is to callClientOptions.SetOrderMatters(false)` (assuming message order is not critical to your application , see the readme for more info on this). Note that it's likely that I'm going to need a way of replicating this issue in order to be able to investigate further (this can be very difficult with intermittent issues like this).

@izarraga
Copy link

Hi

I have the same problem too. In my case it usually occurs at intervals of approx 10 days working.

func mqtt_connect(host string, port string) {
log_to_file(LOG_INFO, "MQTT_SERVER: mqtt_connect()")
opts := MQTT.NewClientOptions().AddBroker("tcp://" + host + ":" + port)
opts.SetClientID(MQTT_CLIENT_ID)
opts.SetUsername(MQTT_USERNAME)
opts.SetPassword(MQTT_PASSWORD)
opts.SetCleanSession(true)
opts.SetOrderMatters(true)
opts.SetKeepAlive(12 * time.Second)
opts.SetPingTimeout(10 * time.Second)
opts.SetProtocolVersion(4) // 3->mqtt 3.1, 4->mqtt 3.1.1
//opts.SetWill("my/will/topic", "Goodbye", 1, true)
//opts.SetDefaultPublishHandler(f)
opts.SetOnConnectHandler(mqtt_OnConnect)
opts.SetConnectionLostHandler(mqtt_OnConnectionLost)
// opts.SetConnectTimeout(8 * time.Second)
opts.SetAutoReconnect(true)
// opts.SetMaxReconnectInterval(10 * time.Second)

mqtt = MQTT.NewClient(opts)
token := mqtt.Connect()
if token.Wait() && token.Error() != nil {
    log_to_file(LOG_WARN, "MQTT_SERVER: connect error %v", token.Error())
} else {
    fmt.Printf("MQTT_SERVER: connected\n")
    log_to_file(LOG_INFO, "MQTT_SERVER: connected")
}

return

}

func mqtt_on_msg(client MQTT.Client, msg MQTT.Message) {
s := strings.Split(msg.Topic(), "/")
//cs := s[0]
did := s[1]
data := string(msg.Payload())
go mqtt_process_msg(did, data);
return
}

@dimonzozo
Copy link
Author

Thank you, @MattBrittan.

I investigated this a bit more.

I suspect you have multiple connections open?

Yes, and I changed a code to log on single connection.

Can you please share some more information on your code (connection options, handlers etc).

SetOrderMatters setting is in default value true. And I think I know how it cause disconnects... Sometimes processing of incoming messages can stuck despite to be done by group of workers and sent via channels and it causes local broker to think that client is dead and disconnect it.

For non blocking case it should be less likely to happen, but I still think there is some bug somewhere and it happens in some rare conditions when lots of disconnects/reconnects happens.

I will continue investigation & debug on this and will post here when will have some additional info.

@MattBrittan
Copy link
Contributor

@izarraga I cant see any issues in the code you provided (but it does not include everything); unfortunately logs are really required in order to diagnose these kind of issues (there are multiple go routines running and I need to understand how they are interacting in order to trace the issue). Note that I do use the library myself (around 40 deployments with varying message volumes) and am not experiencing this issue which means it seems likely it requires a specific set of options/network conditions to manifest. If you are able to provide logs I would appreciate it; my preference would be that you provide these in a new issue (currently there is no way of knowing if the cause of your issue is the same as @dimonzozo's).

@dimonzozo "Sometimes processing of incoming messages can stuck despite to be done by group of workers and sent via channels and it causes local broker to think that client is dead and disconnect it." if your message handler is blocking then this is exactly the kind of behaviour I'd expect. It is quite possible that there is a bug that occurs in rare cases but I have been unable to find one (well I've found and fixed quite a few but cannot see any in the current version); without logs from a single connection showing the failure there is little more that I can do (you are most welcome to review the code yourself and see if you can spot something I have missed). The complexity of the various go-routines is one of the reasons that we are using a much simpler approach with the v5 client.

@MattBrittan
Copy link
Contributor

MattBrittan commented Apr 28, 2021

@dimonzozo - are you calling Disconnect()? (if so your issue may match #501)

@MattBrittan MattBrittan added the Details Required Further information (logs etc) is required before this issue can be investigated label May 11, 2021
@MattBrittan
Copy link
Contributor

The cause of this issue may be the same as #509 which is now fixed (I believe) in @master. The cause of that issue was a deadlock that occurred when the broker disconnected while the client was in a specific state (sending ACK to an incoming publish). Unfortunately this kind of issue is hard to track down without full logs (and ideally a goroutine trace) but getting that info is difficult when the issue cannot be replicated (I have added a test that thoroughly checks broker disconnects whilst operating under a heavy load).

@MattBrittan
Copy link
Contributor

Closing this due to lack of activity (may be fixed by either of the two most recent releases or may be due to issues in the users code - no way of knowing with the information currently available - please feel free to reopen if the issue is occurring with the latest release).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Details Required Further information (logs etc) is required before this issue can be investigated
Projects
None yet
Development

No branches or pull requests

3 participants