Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

paho disconnected silently without calling onConnLost handler #453

Closed
GopherJ opened this issue Sep 18, 2020 · 27 comments
Closed

paho disconnected silently without calling onConnLost handler #453

GopherJ opened this issue Sep 18, 2020 · 27 comments
Labels
Details Required Further information (logs etc) is required before this issue can be investigated

Comments

@GopherJ
Copy link

GopherJ commented Sep 18, 2020

func NewClientId() string {
        rndString := util.RandString(8)
        return fmt.Sprintf("sbl-drive-dispatcher-%s", rndString)
}

func NewClientOptionsFromConfig(config model.Config) (*MQTT.ClientOptions, error) {
        opts := MQTT.NewClientOptions()
        opts.AddBroker(config.MQTT.Url)
        opts.SetClientID(NewClientId())
        opts.SetCleanSession(true)
        opts.SetKeepAlive(time.Duration(15) * time.Second)
        opts.SetAutoReconnect(true)

        if config.MQTT.Username != "" && config.MQTT.Password != "" {
                opts.SetUsername(config.MQTT.Username)
                opts.SetPassword(config.MQTT.Password)
        }

        if config.MQTT.CACert !=
                "" && config.MQTT.ClientKey !=
                "" && config.MQTT.ClientCert !=
                "" {
                tlsConfig, err := tls.NewTLSConfig(config.MQTT)
                if err != nil {
                        return nil, err
                } else {
                        opts.SetTLSConfig(tlsConfig)
                }
        }

        return opts, nil
}

func NewClient() (*MQTT.Client, error) {
        config := model.NewConfigFromFile("Config.toml")
        opts, err := NewClientOptionsFromConfig(*config)
        if err != nil {
                return nil, err
        }
        opts.SetOnConnectHandler(onConnectHandler)
        opts.SetConnectionLostHandler(onConnectionLostHandler)

        client := MQTT.NewClient(opts)
        if token := client.Connect(); token.Wait() && token.Error() != nil {
                return nil, token.Error()
        }
        return &client, nil
}

func onMessageReceived(client MQTT.Client, message MQTT.Message) {
        sharedChannel := shared.NewSharedChannel()
        sharedChannel.MqttSubscriber <- message
}

func onConnectHandler(c MQTT.Client) {
        logger := logger.NewSharedSugaredLogger()
        qos := byte(0)
        topics := [...]string{
                "example1",
                "example2"

        }

        for _, topic := range topics {
                if token := c.Subscribe(topic, qos, onMessageReceived); token.Wait() && token.Error() != nil {
                        logger.Fatalf("mqtt", "failed to subscribe topic: %s, error: %v", topic, token.Error())
                }
        }
}

func onConnectionLostHandler(c MQTT.Client, reason error) {
        logger := logger.NewSharedSugaredLogger()
        logger.Errorf("mqtt", "lost connection, error %s\n", reason.Error())
}
@GopherJ
Copy link
Author

GopherJ commented Sep 18, 2020

I've read #263 #430. This error happens with cloud broker (emqx 4.1 & emqx 4.2), it just happens when I use paho.mqtt.golang to connect broker (mosquitto bridging correctly to broker)

paho.mqtt.golang works correctly while restarting, but after some time it starts to lose connections. Then onConnectionLostHandler has been called again and again

Details:

I think the error is on paho.mqtt.golang side because if I use mosquitto as client it works correctly, there isn't lost of connection

@GopherJ
Copy link
Author

GopherJ commented Sep 18, 2020

I found also if I remove ssl and put the client code on the cloud server and then connect to 1883 instead of 8883, it works correctly.

I'm not sure if the error is caused by PIngTimeout because it's by default 10 seconds, will 10 seconds be too small?

@MattBrittan
Copy link
Contributor

MattBrittan commented Sep 18, 2020

Hi @GopherJ,

Unfortunately your description and code does not provide much information to work with. For example you make no mention of specific errors. Can you please:

  • Confirm which version of the package you are using (and if not the master please try that (go get github.com/eclipse/paho.mqtt.golang@master if using modules).
  • Provide logs obtained when the issue is occurring (see my response on this issue for instructions on enabling logging). This does not always provide enough information but without logs or an application that reproduces the fault there is little anyone can do to help you.
  • Ideally broker logs if possible (its nice to see what is happening at both ends of the connection). Note that you will need to provide these to the EMQX team to enable them to investigate the issue you raised there.
  • Confirm that the send to the channel returned by shared.NewSharedChannel() is not blocking (as mentioned in the other issue blocking in a handler is a bad thing!). If the order of messages is not essential you may want to change this to go func(){sharedChannel.MqttSubscriber <- message}() to ensure that is not the issue or, alternatively, add logging at the start/end of this function so you confirm that it is not your code that is causing the issue.
  • Ideally provide a Minimal, Reproducible Example. I understand that it can be hard to write something that consistently replicates a fault but it makes tracking down the issues a lot easier!

Note: One thing that would cause the symptoms you explain is if there are two connections with the same client id. Are you sure that util.RandString(8) is not producing duplicates? (the broker must disconnect any existing connections with an ID if a new connection is established with the same ID so you can end up with two clients continually disconnecting each other).

Note2: I doubt that your SSL certificate has anything to do with this (it will either work or not work). SSL does add some overhead which may have an impact if this is a weird timing issue or problem handling errors during the SSL handshake (there is a pull request re that but I think that is more an issue of an incorrect error being returned rather than no error).

Thanks,
Matt

@GopherJ
Copy link
Author

GopherJ commented Sep 18, 2020

Hi, sorry my fault I'll do it today once got time.

BTW I checked emqx's dashboard the clientid is isn't the same, but yes I'll try first improving this part.

@GopherJ
Copy link
Author

GopherJ commented Sep 25, 2020

Hi @MattBrittan I can always reproduce by running a simple program for a morning.

Could you help checking why it happens and how to fix it? I don't have this problem with mosquitto, I'm 100% sure it's related to paho.mqtt.golang.

mosquitto + emqx => ok
paho.mqtt.rust + emqx => ok
paho.mqtt.golang + emqx => pingresp not received, disconnecting
paho.mqtt.golang + mosquitto => pingresp not received, disconnecting

we switched from paho.mqtt.rust to paho.mqtt.golang because paho.mqtt.rust doesn't support async/await yet.

However, starting from this change we started to have this issue

@GopherJ
Copy link
Author

GopherJ commented Sep 25, 2020

BTW I confirm that I'm using latest paho.mqtt.golang

@alsm
Copy link
Contributor

alsm commented Sep 25, 2020

You haven't provided the demo program yet, but from looking at the code you posted initally are you sure that this

func onMessageReceived(client MQTT.Client, message MQTT.Message) {
        sharedChannel := shared.NewSharedChannel()
        sharedChannel.MqttSubscriber <- message
}

is working as you intend and that something is reading the messages off the channel you create? If that function is blocking and not returning, or getting into a state like that it could block up the receiving of packets and pingresps would not be processed.

@GopherJ
Copy link
Author

GopherJ commented Sep 26, 2020

@alsm I don't think there is anything blocks. It's completely different goroutines...

@alsm @MattBrittan Pls there is already #429, #430, #263 ...

and...

mosquitto + emqx => ok
paho.mqtt.rust + emqx => ok
paho.mqtt.golang + emqx => pingresp not received, disconnecting
paho.mqtt.golang + mosquitto => pingresp not received, disconnecting

If all of this cannot make you think there is a problem, I don't know what to say...as I said I don't have other code to show you, it's really just a simple subscribe, what else can I do?...

The version that I'm using is 1.2.0 which should be the latest release, now I'm testing master, I'll let you know if this error disappears or still happens

@someburner
Copy link

Updated to latest and am now also experiencing this. Definitely something going on.

@MattBrittan
Copy link
Contributor

@GopherJ unfortunately unless you can provide the information requested it's unlikely that we will be able to help you; this package provides a lot of options and its quite possible that you are using a combination that no one else is (so others may well not see what you are seeing). The issues you have referenced are either old (the code having been rewritten), most likely issues with the users code, or have insufficient information to determine the cause. The issue that @alsm raised (and I had mentioned previously) is probably the thing that most frequently causes the symptoms you are experiencing (because the go-routines are communicating through a channel they can block each other). There have been some major changes since version 1.2.0 and I hope we will be able to make a new release shortly so for now I recommend using @master.

@someburner please provide more information (preferably with logs). What version did you come from? There was a change some time back that removed the buffers from a number of channels and exposed issues with users code that were previously hidden (e.g. blocking in a callback, possibly by calling Publish from within the callback). It may be simpler if you raise a new issue because often issues that seem similar at first glance are actually quite different (and as @GopherJ has only just moved to @master it seems unlikely they were hitting whatever issue you are).

@someburner
Copy link

@MattBrittan just to update- this was happening with mosquitto at a high load (10K devices and around 10K msg/sec). Switching to another broker that handles the throughput better and I no longer see this, it was probably unrelated as you say.

I know this is outside the scope of this project but in case others run across this, or someone has experience here, there seems to be issues with a QoS1 subscriber receiving high message loads from mosquitto, seems like mosquitto does not get the client puback. It could be that I'm doing something wrong but emqx and jorramq seem to handle the high loads better without my code changing.

@MattBrittan
Copy link
Contributor

@someburner which commit are you using? PR #443 was committed a couple of weeks ago and prevents this package from reusing message ID's immediately. I had an issue where Mosquitto was dropping messages and originally implemented this as a way to trace the issue (tracing things in the Mosquitto logs is difficult when ID's are constantly reused) but the change resolved the issue; my theory, without any real evidence, was that Mosquitto may not be clearing message ID's from it's store immediately so drops some because they seemingly have duplicate IDs (the issue only occurred when sending multiple messages per second). There is a slight possibility that your issue is related (i.e. the ping messages get dropped); I did have a scan through the Mosquitto source but could not see anything obvious there (but my C skills are a bit rusty!) and as the problem had gone away moved onto other things.
Sorry but as I only process around 10 messages/sec this is not an issue I'm likely to encounter for some time - still great to hear that this package is being used to process that kind of message volume and would really appreciate your feedback on @master as I think we should make a new release soon!.

@someburner
Copy link

someburner commented Oct 2, 2020

I have been habitually doing go get -u on this repo, and I tested that the issue was occurring before and after that commit. For the time being I'm just accepting mosquitto is not suited to large deployments unless you use qos0 for everything. It's possible there is an mid issue that is cascading into the resulting pingresp issue, I may take a deep dive into it later.

For now the library seems to work fine with my high load tests on other brokers but I'll definitely let you know if anything comes up. Thanks for being a responsive maintainer! I have been using master without issue for most of these tests so at least for my use case I'd say it's solid.

@GopherJ
Copy link
Author

GopherJ commented Oct 28, 2020

this error is happening on our production site very dangerously, since paho.mqtt.golang team doesn't accept any errors on their side and doesn't want to inverstigate any lines of code and always think we are using old versions even we are on master already...., I suggest people who use paho.mqtt.golang being cautious because this error may happen to you.

I'll rewrite this part in other libraries. (paho.mqtt.c/paho.mqtt.rust/MQTT.js)

@MattBrittan
Copy link
Contributor

MattBrittan commented Oct 28, 2020

@GopherJ As far as I can see, from a review of the comments, no-one has said that there is not an issue; what we have said is that we need you to provide information that would enable us to identify/trace the problem (and pointed out a possible issue with your code that may explain the issue e.g. if sharedChannel.MqttSubscriber <- message blocks).

At this point we do not have enough information to investigate your issue further (and cannot even ascertain if the problem is with this package or elsewhere). If you provide the logs (and ideally code that enables the issue to be duplicated) then the issue will be investigated; please bear in mind that work on this project is carried out by volunteers so making an effort to respond to their questions is likely to lead to a faster resolution.

@MattBrittan
Copy link
Contributor

@someburner just thought it might be worth referencing issue #458 because your issue may have had the same cause. In summary the mosquitto option max_queued_messages defaults to 100 messages and after that messages may be dropped (I can see how that might be happening with the message volumes you are processing).

@GopherJ
Copy link
Author

GopherJ commented Nov 5, 2020

Now I can see paho.mqtt.golang cannot recover subscriptions from a disconnect, after running for a half day or a day I can always reproduce this.

I have clientSession: false and reconnect: true but it never succeed to recover subscriptions.

Before rewriting evething now I need to kill the process once disconnected

@MattBrittan
Copy link
Contributor

MattBrittan commented Nov 5, 2020

Now I can see paho.mqtt.golang cannot recover subscriptions from a disconnect

What led you to believe it would?

The broker should retain subscriptions as part of the session state in defined circumstances (see the clean session section). paho.mqtt.golang makes no attempt to re-create subscriptions when reconnecting (partly because it may not be necessary; I believe the python library takes the same approach).

If your broker is not retaining the subscriptions then I'd recommend you check the spec (to ensure that you meet the criteria under which the session state should be retained) and your brokers settings (not all brokers comply with the spec). If you want to re-establish subscriptions then you will need to do it yourself (example here).

Note that if the broker is not retaining the session state then you are likely to loose messages when you reconnect (because anything published while the connection is down becomes part of the session state). So if that is important to you then fixing the broker setup should be a priority.

Matt

@GopherJ
Copy link
Author

GopherJ commented Nov 6, 2020

@MattBrittan as you can see from my previous comments:

mosquitto + emqx => ok
paho.mqtt.rust + emqx => ok
paho.mqtt.golang + emqx => pingresp not received, disconnecting
paho.mqtt.golang + mosquitto => pingresp not received, disconnecting

So I don't know why do I need to fix my broker, I'll close this issue because it's not useful at all to debate with you. You are just assuming every error is from users' side without investigating any single line of code.

paho,mqtt.golang is the worst MQTT client library that I've used, with a maintainer who is keeping suggesting upgraing to master, check clean session section, reproducible example... it's even worse, I'll never use it again.

I can understand it's difficult to maintain an open source project during free time, but providing unuseful advice will not help at all. If I spend some time to create this issue and debate with you, that means there is a problem otherwise I'll not waste time

@GopherJ GopherJ closed this as completed Nov 6, 2020
@GopherJ GopherJ reopened this Nov 30, 2020
@GopherJ
Copy link
Author

GopherJ commented Nov 30, 2020

This error starts to happen on our company's another golang project...

@MattBrittan
Copy link
Contributor

@GopherJ as discussed previously debugging information will be required if we are to offer you any help (please see my previous comments or the readme for examples of the information that is useful). This issue is most commonly due to the callback passed to the 'Subscribe' function blocking (so within your control rather than part of this library) - I'm not saying that this is your issue but that is the first thing I would suggest checking (starting off a goroutine, as suggested in my earlier comment, is a quick way to eliminate this as a cause).

@GopherJ GopherJ changed the title pingresp not received, disconnecting paho disconnected silently without calling onConnLost handler Nov 30, 2020
@MattBrittan MattBrittan added the Details Required Further information (logs etc) is required before this issue can be investigated label Dec 15, 2020
@yunfuyiren
Copy link

yunfuyiren commented Dec 20, 2020

@GopherJ have you ever found the solution? I face the same problem too, with mosquitto 2.0.0, paho 1.3.0.
the broker returned "Socket err" of the client, but there is nothing output of client log.

@GopherJ
Copy link
Author

GopherJ commented Dec 20, 2020

@yunfuyiren @MattBrittan Hi, I got some understanding on this issue. It's caused by paho.mqtt.golang's limitation + golang's channel issue.

I have around 300 IOT devices broadcasting to broker and sometimes there is a massive message stream. I use channel to send messages to other goroutines for handling

However, if there are two much messages, channel will block and eventually cause paho.mqtt.golang's disconnection, because if you see in FAQ, paho.mqtt.golang doesn't want block code in message handler.

I didn't solve this issue yet and it's happening in many of our projects. It's hard to say in which condition paho.mqtt.golang will lose connection and never reconnect back. What is the definition of block code? it even happens to a small project which basically does just pretty simple work in message handler.

I feel this part is out of my countrol, paho.mqtt.golang should allow users to detect this issue, otherwise it can be dramatic.

Lose connection + no way to detect drived me crazy during the last month, now I'm on other projects so I put it aside.

@MattBrittan
Copy link
Contributor

Thanks for the extra detail @GopherJ.

What is the definition of block code?

By default (SetOrderMatters(true)) the handlers are called synchronously and, while waiting for the handler to return, the library will not process any incoming messages. If order is not important to you then setting SetOrderMatters(false) might help - with this setting your handler will be called in a new go-routine.

The issue you are seeing appears (with the limited info available) to be due to a handler blocking for around 10 seconds (so a considerable period). Because the library does not process incoming messages whilst waiting on the handler it does not see the ping response from the broker and ends up disconnecting because of that.

So blocking really means any action that unduly delays your handler from returning. In the code you shared a while ago sharedChannel.MqttSubscriber <- message may block if sharedChannel.MqttSubscriber is unbuffered or the buffer is full. Unless it is essential that the actions sharedChannel.MqttSubscriber carries out are completed before the ACK is sent I would recommend using go func(){sharedChannel.MqttSubscriber <- message}() to prevent any chance of this blocking. If any of the code in the handler attempts to use the paho library to send another message this may lead to a deadlock (so don't call Publish() from within a handler; doing so in a go-routine is fine). Note that generally you would set things such that keep-alives only require a response within 10 seconds (PingTimeout) and this means that your handler does have a bit of time to play with.

it even happens to a small project which basically does just pretty simple work in message handler.

Please provide the code for this simple example; as mentioned previously if I can duplicate the issue then I can probably fix it (or point out where the issue is in your code if it's not in the library). The library uses quite a few go-routines and their interactions can be complex which makes it hard to spot an issue without logs to point you in the right direction (issue #468 is an example of such an issue, it occurred in a very rare set of circumstances but due to the logs provided I believe a solution has been found).

I feel this part is out of my countrol, paho.mqtt.golang should allow users to detect this issue, otherwise it can be dramatic.

Unfortunately without access to your code, or the detailed logs that the library can produce, I cannot really know what the cause of the issue you are encountering is.

@yunfuyiren please review the common problems section of the readme and confirm that nothing in there applies. If that does not help then please raise a new issue (noting the reporting bugs section); the more information you can provide the more likely that someone will be able to help. A specific note based on the limited info you provided; try enabling keep-alives (this is needed due to the way TCP works).

@MattBrittan
Copy link
Contributor

Closing this issue as it's been quiet for three months.

@matthewfarstad
Copy link

Happening still to me even with latest master and using SetOrderMatters(false). All callbacks do is spawn a goroutine and I'm not using channels anywhere yet. Any ideas?

@matthewfarstad
Copy link

Aha! It was a Golang gotcha

client.ClientOptions.SetPingTimeout(time.Duration(config.MQTT.Timeout) * time.Second)
client.ClientOptions.SetKeepAlive(time.Second * 30)

had to multiply the duration by the unit I wanted. Thought I implicitly got seconds.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Details Required Further information (logs etc) is required before this issue can be investigated
Projects
None yet
Development

No branches or pull requests

6 participants