Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

the paho client appear no ack after receive message #441

Closed
apolloyang2017 opened this issue Jul 14, 2020 · 9 comments
Closed

the paho client appear no ack after receive message #441

apolloyang2017 opened this issue Jul 14, 2020 · 9 comments

Comments

@apolloyang2017
Copy link

apolloyang2017 commented Jul 14, 2020

the paho client appear suddenly no ack after receive message until I restart the program with the paho client
the following is the trace log from emqx server before restart the paho client :
openservice.fault.log
the following is the trace log from emqx server after restart the paho client :
openservice.ok.log

no any errror info in log, it seems the paho client dead lock, I had upgraded the paho client for the following bug before:
0526fed

can you tell me if exist another problem in the paho client ?

@MattBrittan
Copy link
Contributor

MattBrittan commented Jul 14, 2020

Sorry - more focused information will be required to assist you with this. Could you please:

  • Extract a few relevant sections from the log files you submitted that clearly show the issues (one showing what it looks like when its working and another after the failure). Currently the logs are pretty meaningless to me (unless they are just showing that no PUBACK is received in the first log).
  • Turn on logging with in the paho library and include the relevant section of the log (see my response on this issue re enabling logging)
  • Include the relevant parts of your application (i.e. if the issue is with PUBACK then show how you connect, subscribe and handle messages). This package has a lot of options which significantly change its behavior so its hard to comment without knowing how you are using it.
  • State what version of the library you are using (and try the latest - go get github.com/eclipse/paho.mqtt.golang@master if using modules). From your message it looks like you are using @master already (you mention a commit).

However the best way to get a resolution is to try and simplify your code so you can provide a minimal reproducable example. Often when creating such an example I find the issue within my own code (and if it is a bug in the paho client then having something I can use to duplicate it makes it much easer to solve).

Assuming you are receiving messages and mean the PUBACK then this is only sent after a message handlers return (or never if there is no message handler) - see here. Most of the time this issue is raised that is the problem (i.e. your handler is not returning for whatever reason).

There is also an outstanding pull request (#434) which could be related (but this is mainly focused on the issues whilst resuming).

Note: If you are unable to more general help with this you might have better luck asking on one of the resources mentioned in the readme or on stackoverflow . There are only a few people who monitor issues here (and I have no experience with emqx).

@apolloyang2017
Copy link
Author

yes, I use the latest source from master.
I will turn on logging with in the paho library, if appear again, I will send you the log info, thank you~

@MattBrittan
Copy link
Contributor

I will send you the log info

Note that it would also be worth logging your handlers (entry and exit). As mentioned this issue is often due to handlers that to not exit within the users code.

@apolloyang2017
Copy link
Author

apolloyang2017 commented Aug 25, 2020

hi, MattBrittan:

following is my trace code:

func StartService() error {
	l := logger.Get()
	l.Debug("start connect MQTT server ...")
	c := MQTT.NewClient(opts)
	c.AddRoute("/c2s/rpc/#", c2sRpcHandler)
	c.AddRoute("/c2s/async_reply/#", c2sAsyncReplyHandler)
	c.AddRoute("/c2s/async_stateless/#", c2sAsyncStatelessHandler)
	c.AddRoute("/c2s/message/#", c2sMessageHandler)
	c.AddRoute("/sc2s/rpc/#", c2sRpcHandler)
	c.AddRoute("/sc2s/async_reply/#", c2sAsyncReplyHandler)
	c.AddRoute("/sc2s/message/#", c2sMessageHandler)
	c.AddRoute("$SYS/brokers/+/clients/#", brokerClientsHandler)
	if config.Get().Common.Mode == "develop" {
		MQTT.CRITICAL = log.New(os.Stdout, "", 0)
		MQTT.DEBUG = log.New(os.Stdout, "", 0)
		MQTT.WARN = log.New(os.Stdout, "", 0)
		MQTT.ERROR = log.New(os.Stdout, "", 0)
	}
	if token := c.Connect(); token.Wait() && token.Error() != nil {
		return token.Error()
	}

	//following check alive
	go func() {
		alive := time.Now().Unix()
		c.Subscribe("test", 1, func(client MQTT.Client, msg MQTT.Message) {
			alive = time.Now().Unix()
		})

		for _ = range time.NewTicker(5 * time.Second).C {
			log := logger.Get()
			log.Debug("will send test message ...")
			if token := c.Publish("test", 1, false, ""); !token.WaitTimeout(3*time.Second) || token.Error() != nil {
				log.Error("!!! the mqtt connection appear fault !!!")
				if token.Error() != nil {
					log.Error(token.Error().Error())
				}
				os.Exit(1)
			}

			if time.Now().Unix() > alive+10 {
				log.Error("!!! It can't receive test message over 10 seconds !!!")
				os.Exit(1)
			}
		}
	}()

	return nil
}

@apolloyang2017
Copy link
Author

2020-08-24T21:55:12.057+0800 debug /pipeline/source/src/isurpass/openservice/mqttservice/mqttservice.go:115 will send test message ...
2020-08-24T21:55:17.057+0800 debug /pipeline/source/src/isurpass/openservice/mqttservice/mqttservice.go:115 will send test message ...
2020-08-24T21:55:22.057+0800 debug /pipeline/source/src/isurpass/openservice/mqttservice/mqttservice.go:115 will send test message ...
2020-08-24T21:55:22.057+0800 error /pipeline/source/src/isurpass/openservice/mqttservice/mqttservice.go:125 !!! It can't receive test message over 10 seconds !!!

@apolloyang2017
Copy link
Author

2020-08-24T21:55:05.999534734Z [net] Received Message
2020-08-24T21:55:05.999581437Z [net] Received Message
2020-08-24T21:55:05.999587245Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:05.999591575Z [net] received publish, msgId: 186
2020-08-24T21:55:05.999595445Z [net] logic waiting for msg on ibound
2020-08-24T21:55:05.999604160Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:05.999610740Z [net] received publish, msgId: 185
2020-08-24T21:55:05.999616700Z [net] logic waiting for msg on ibound
2020-08-24T21:55:05.999622296Z [router] matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.
2020-08-24T21:55:05.999643212Z [router] matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.
2020-08-24T21:55:06.230339731Z [net] Received Message
2020-08-24T21:55:06.230383741Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:06.230403972Z [net] received publish, msgId: 19091
2020-08-24T21:55:06.230444065Z [net] logic waiting for msg on ibound
2020-08-24T21:55:06.230478332Z [net] putting puback msg on obound
2020-08-24T21:55:06.230492034Z [store] memorystore del: message 19091 was deleted
2020-08-24T21:55:06.230528325Z [net] done putting puback msg on obound
2020-08-24T21:55:06.230535817Z [net] obound priority msg to write, type *packets.PubackPacket
2020-08-24T21:55:06.230635536Z [net] outgoing waiting for an outbound message
2020-08-24T21:55:06.869064035Z [net] Received Message
2020-08-24T21:55:06.869087760Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:06.869151166Z [net] received publish, msgId: 19092
2020-08-24T21:55:06.869163084Z [net] logic waiting for msg on ibound
2020-08-24T21:55:06.869169170Z [net] putting puback msg on obound
2020-08-24T21:55:06.869173333Z [store] memorystore del: message 19092 was deleted
2020-08-24T21:55:06.869177142Z [net] done putting puback msg on obound
2020-08-24T21:55:06.869221589Z [net] obound priority msg to write, type *packets.PubackPacket
2020-08-24T21:55:06.869254479Z [net] outgoing waiting for an outbound message
2020-08-24T21:55:06.947433874Z [net] Received Message
2020-08-24T21:55:06.947477645Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:06.947485986Z [net] received publish, msgId: 35
2020-08-24T21:55:06.947492002Z [net] logic waiting for msg on ibound
2020-08-24T21:55:06.947566142Z [router] matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.
2020-08-24T21:55:07.760876154Z [pinger] ping check 0.891502213
2020-08-24T21:55:07.761532961Z 2020-08-25T05:55:07.761+0800 debug /pipeline/source/src/isurpass/openservice/mqttservice/mqttservice.go:115 will send test message ...
2020-08-24T21:55:07.761553979Z [client] enter Publish
2020-08-24T21:55:07.761558855Z [client] sending publish message, topic: test
2020-08-24T21:55:07.761562634Z [net] obound wrote msg, id: 1
2020-08-24T21:55:07.761566515Z [net] outgoing waiting for an outbound message
2020-08-24T21:55:07.761910281Z [net] Received Message
2020-08-24T21:55:07.761934044Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:07.761994406Z [store] memorystore del: message 1 was deleted
2020-08-24T21:55:07.762038049Z [net] received puback, id: 1
2020-08-24T21:55:07.762046188Z [net] logic waiting for msg on ibound
2020-08-24T21:55:07.762111626Z [net] Received Message
2020-08-24T21:55:07.762123967Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:07.762139805Z [net] received publish, msgId: 19093
2020-08-24T21:55:07.762194674Z [net] logic waiting for msg on ibound
2020-08-24T21:55:07.762207671Z [net] putting puback msg on obound
2020-08-24T21:55:07.762214863Z [store] memorystore del: message 19093 was deleted
2020-08-24T21:55:07.762220790Z [net] done putting puback msg on obound
2020-08-24T21:55:07.762240859Z [net] obound priority msg to write, type *packets.PubackPacket
2020-08-24T21:55:07.762303702Z [net] outgoing waiting for an outbound message
2020-08-24T21:55:08.159429934Z [net] Received Message
2020-08-24T21:55:08.159474222Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:08.159482043Z [net] received publish, msgId: 6689
2020-08-24T21:55:08.159487704Z [net] logic waiting for msg on ibound
2020-08-24T21:55:08.159557101Z [router] matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.
2020-08-24T21:55:11.230176785Z [net] Received Message
2020-08-24T21:55:11.230193646Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:11.230198489Z [net] received publish, msgId: 19094
2020-08-24T21:55:11.230202340Z [net] logic waiting for msg on ibound
2020-08-24T21:55:11.230263421Z [net] putting puback msg on obound
2020-08-24T21:55:11.230286525Z [store] memorystore del: message 19094 was deleted
2020-08-24T21:55:11.230291038Z [net] done putting puback msg on obound
2020-08-24T21:55:11.230294835Z [net] obound priority msg to write, type *packets.PubackPacket
2020-08-24T21:55:11.230313646Z [net] outgoing waiting for an outbound message
2020-08-24T21:55:11.869037604Z [net] Received Message
2020-08-24T21:55:11.869062110Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:11.869069677Z [net] received publish, msgId: 19095
2020-08-24T21:55:11.869075696Z [net] logic waiting for msg on ibound
2020-08-24T21:55:11.869135923Z [net] putting puback msg on obound
2020-08-24T21:55:11.869147355Z [store] memorystore del: message 19095 was deleted
2020-08-24T21:55:11.869153564Z [net] done putting puback msg on obound
2020-08-24T21:55:11.869159869Z [net] obound priority msg to write, type *packets.PubackPacket
2020-08-24T21:55:11.869217442Z [net] outgoing waiting for an outbound message
2020-08-24T21:55:12.202447128Z [net] Received Message
2020-08-24T21:55:12.202480274Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:12.202501102Z [net] received publish, msgId: 6805
2020-08-24T21:55:12.202505741Z [net] logic waiting for msg on ibound
2020-08-24T21:55:12.202509524Z [router] matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.
2020-08-24T21:55:12.760854855Z [pinger] ping check 0.89157406
2020-08-24T21:55:12.761325326Z 2020-08-25T05:55:12.761+0800 debug /pipeline/source/src/isurpass/openservice/mqttservice/mqttservice.go:115 will send test message ...
2020-08-24T21:55:12.761419974Z [client] enter Publish
2020-08-24T21:55:12.761436737Z [client] sending publish message, topic: test
2020-08-24T21:55:12.761543814Z [net] obound wrote msg, id: 1
2020-08-24T21:55:12.761556591Z [net] outgoing waiting for an outbound message
2020-08-24T21:55:12.762095229Z [net] Received Message
2020-08-24T21:55:12.762109989Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:12.762117291Z [store] memorystore del: message 1 was deleted
2020-08-24T21:55:12.762123159Z [net] received puback, id: 1
2020-08-24T21:55:12.762137700Z [net] logic waiting for msg on ibound
2020-08-24T21:55:12.762194868Z [net] Received Message
2020-08-24T21:55:12.762203603Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:12.762207445Z [net] received publish, msgId: 19096
2020-08-24T21:55:12.762211051Z [net] logic waiting for msg on ibound
2020-08-24T21:55:12.762270148Z [net] putting puback msg on obound
2020-08-24T21:55:12.762281225Z [store] memorystore del: message 19096 was deleted
2020-08-24T21:55:12.762285181Z [net] done putting puback msg on obound
2020-08-24T21:55:12.762288999Z [net] obound priority msg to write, type *packets.PubackPacket
2020-08-24T21:55:12.762337498Z [net] outgoing waiting for an outbound message
2020-08-24T21:55:12.936448332Z [net] Received Message
2020-08-24T21:55:12.936481186Z [net] Received Message
2020-08-24T21:55:12.936485739Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:12.936604779Z [net] received publish, msgId: 7
2020-08-24T21:55:12.936624431Z [net] logic waiting for msg on ibound
2020-08-24T21:55:12.936632075Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:12.936637386Z [net] received publish, msgId: 6
2020-08-24T21:55:12.936642584Z [net] Received Message
2020-08-24T21:55:12.936648345Z [net] logic waiting for msg on ibound
2020-08-24T21:55:12.936654413Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:12.936660122Z [net] received publish, msgId: 5
2020-08-24T21:55:12.936666149Z [net] logic waiting for msg on ibound
2020-08-24T21:55:12.936688814Z [router] matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.
2020-08-24T21:55:12.936741734Z [router] matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.
2020-08-24T21:55:12.936753198Z [router] matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.
2020-08-24T21:55:13.842656481Z [net] Received Message
2020-08-24T21:55:13.842703698Z [net] Received Message
2020-08-24T21:55:13.842712172Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:13.842718309Z [net] received publish, msgId: 142
2020-08-24T21:55:13.842724168Z [net] logic waiting for msg on ibound
2020-08-24T21:55:13.842729498Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:13.842846020Z [net] received publish, msgId: 141
2020-08-24T21:55:13.842865943Z [net] Received Message
2020-08-24T21:55:13.842872925Z [net] logic waiting for msg on ibound
2020-08-24T21:55:13.842878198Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:13.842883684Z [net] received publish, msgId: 122
2020-08-24T21:55:13.842897175Z [net] Received Message
2020-08-24T21:55:13.842903231Z [net] logic waiting for msg on ibound
2020-08-24T21:55:13.842909095Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:13.842922048Z [net] received publish, msgId: 121
2020-08-24T21:55:13.842928056Z [router] matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.
2020-08-24T21:55:13.842997713Z [router] matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.
2020-08-24T21:55:13.843008544Z [router] matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.
2020-08-24T21:55:13.843012713Z [router] matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.
2020-08-24T21:55:13.843016559Z [net] logic waiting for msg on ibound
2020-08-24T21:55:15.689481443Z [net] Received Message
2020-08-24T21:55:15.689538440Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:15.689553233Z [net] received publish, msgId: 2985
2020-08-24T21:55:15.689559173Z [net] logic waiting for msg on ibound
2020-08-24T21:55:15.689564889Z [router] matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.
2020-08-24T21:55:15.918606665Z [net] Received Message
2020-08-24T21:55:15.918646457Z [net] Received Message
2020-08-24T21:55:15.918652163Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:15.918656127Z [net] received publish, msgId: 357
2020-08-24T21:55:15.918674810Z [net] logic waiting for msg on ibound
2020-08-24T21:55:15.918679187Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:15.918682942Z [net] received publish, msgId: 159
2020-08-24T21:55:15.918686528Z [net] logic waiting for msg on ibound
2020-08-24T21:55:15.918690236Z [router] matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.
2020-08-24T21:55:15.918694122Z [router] matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.
2020-08-24T21:55:16.230126147Z [net] Received Message
2020-08-24T21:55:16.230140447Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:16.230144707Z [net] received publish, msgId: 19097
2020-08-24T21:55:16.230148446Z [net] logic waiting for msg on ibound
2020-08-24T21:55:16.230227900Z [net] putting puback msg on obound
2020-08-24T21:55:16.230237598Z [store] memorystore del: message 19097 was deleted
2020-08-24T21:55:16.230241582Z [net] done putting puback msg on obound
2020-08-24T21:55:16.230246103Z [net] obound priority msg to write, type *packets.PubackPacket
2020-08-24T21:55:16.230301360Z [net] outgoing waiting for an outbound message
2020-08-24T21:55:16.868987204Z [net] Received Message
2020-08-24T21:55:16.869003800Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:16.869008569Z [net] received publish, msgId: 19098
2020-08-24T21:55:16.869012538Z [net] logic waiting for msg on ibound
2020-08-24T21:55:16.869122333Z [net] putting puback msg on obound
2020-08-24T21:55:16.869135825Z [store] memorystore del: message 19098 was deleted
2020-08-24T21:55:16.869142334Z [net] done putting puback msg on obound
2020-08-24T21:55:16.869148558Z [net] obound priority msg to write, type *packets.PubackPacket
2020-08-24T21:55:16.869196010Z [net] outgoing waiting for an outbound message
2020-08-24T21:55:17.760827655Z [pinger] ping check 0.891540444
2020-08-24T21:55:17.761329220Z 2020-08-25T05:55:17.761+0800 debug /pipeline/source/src/isurpass/openservice/mqttservice/mqttservice.go:115 will send test message ...
2020-08-24T21:55:17.761619330Z [client] enter Publish
2020-08-24T21:55:17.761636513Z [client] sending publish message, topic: test
2020-08-24T21:55:17.761644201Z [net] obound wrote msg, id: 1
2020-08-24T21:55:17.761650771Z [net] outgoing waiting for an outbound message
2020-08-24T21:55:17.762441742Z [net] Received Message
2020-08-24T21:55:17.762457557Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:17.762462661Z [store] memorystore del: message 1 was deleted
2020-08-24T21:55:17.762466901Z [net] received puback, id: 1
2020-08-24T21:55:17.762487268Z [net] logic waiting for msg on ibound
2020-08-24T21:55:17.762497917Z [net] Received Message
2020-08-24T21:55:17.762503912Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:17.762509614Z [net] received publish, msgId: 19099
2020-08-24T21:55:17.762516298Z [net] logic waiting for msg on ibound
2020-08-24T21:55:17.762521913Z [net] putting puback msg on obound
2020-08-24T21:55:17.762527315Z [store] memorystore del: message 19099 was deleted
2020-08-24T21:55:17.762530994Z [net] done putting puback msg on obound
2020-08-24T21:55:17.762534786Z [net] obound priority msg to write, type *packets.PubackPacket
2020-08-24T21:55:17.762539349Z [net] outgoing waiting for an outbound message
2020-08-24T21:55:21.230488917Z [net] Received Message
2020-08-24T21:55:21.230512164Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:21.230517108Z [net] received publish, msgId: 19100
2020-08-24T21:55:21.230520877Z [net] logic waiting for msg on ibound
2020-08-24T21:55:21.230524668Z [net] putting puback msg on obound
2020-08-24T21:55:21.230528302Z [store] memorystore del: message 19100 was deleted
2020-08-24T21:55:21.230536208Z [net] done putting puback msg on obound
2020-08-24T21:55:21.230540250Z [net] obound priority msg to write, type *packets.PubackPacket
2020-08-24T21:55:21.230627972Z [net] outgoing waiting for an outbound message
2020-08-24T21:55:21.725520770Z [net] Received Message
2020-08-24T21:55:21.725553850Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:21.725600716Z [net] received publish, msgId: 10770
2020-08-24T21:55:21.725608557Z [net] logic waiting for msg on ibound
2020-08-24T21:55:21.725613505Z [router] matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.
2020-08-24T21:55:21.869114499Z [net] Received Message
2020-08-24T21:55:21.869133999Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:21.869231130Z [net] received publish, msgId: 19101
2020-08-24T21:55:21.869258169Z [net] logic waiting for msg on ibound
2020-08-24T21:55:21.869266193Z [net] putting puback msg on obound
2020-08-24T21:55:21.869274049Z [store] memorystore del: message 19101 was deleted
2020-08-24T21:55:21.869278602Z [net] done putting puback msg on obound
2020-08-24T21:55:21.869286402Z [net] obound priority msg to write, type *packets.PubackPacket
2020-08-24T21:55:21.869337241Z [net] outgoing waiting for an outbound message
2020-08-24T21:55:21.905612960Z [net] Received Message
2020-08-24T21:55:21.905646784Z [net] Received Message
2020-08-24T21:55:21.905666211Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:21.905688861Z [net] received publish, msgId: 333
2020-08-24T21:55:21.905706753Z [net] logic waiting for msg on ibound
2020-08-24T21:55:21.905711315Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:21.905757743Z [net] received publish, msgId: 302
2020-08-24T21:55:21.905776130Z [net] logic waiting for msg on ibound
2020-08-24T21:55:21.905780433Z [router] matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.
2020-08-24T21:55:21.905784384Z [router] matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.
2020-08-24T21:55:22.760957210Z [pinger] ping check 0.89143763
2020-08-24T21:55:22.761328714Z 2020-08-25T05:55:22.761+0800 debug /pipeline/source/src/isurpass/openservice/mqttservice/mqttservice.go:115 will send test message ...
2020-08-24T21:55:22.761431211Z [client] enter Publish
2020-08-24T21:55:22.761446770Z [client] sending publish message, topic: test
2020-08-24T21:55:22.762183973Z [net] obound wrote msg, id: 1
2020-08-24T21:55:22.762200063Z [net] outgoing waiting for an outbound message
2020-08-24T21:55:22.762207664Z [net] Received Message
2020-08-24T21:55:22.762213921Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:22.762232726Z [store] memorystore del: message 1 was deleted
2020-08-24T21:55:22.762239170Z [net] received puback, id: 1
2020-08-24T21:55:22.762245134Z [net] logic waiting for msg on ibound
2020-08-24T21:55:22.762250882Z [net] Received Message
2020-08-24T21:55:22.762264065Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:22.762353037Z [net] received publish, msgId: 19102
2020-08-24T21:55:22.762369319Z [net] logic waiting for msg on ibound
2020-08-24T21:55:22.762375749Z [net] putting puback msg on obound
2020-08-24T21:55:22.762381843Z [store] memorystore del: message 19102 was deleted
2020-08-24T21:55:22.762388326Z [net] done putting puback msg on obound
2020-08-24T21:55:22.762417519Z [net] obound priority msg to write, type *packets.PubackPacket
2020-08-24T21:55:22.762423720Z [net] outgoing waiting for an outbound message
2020-08-24T21:55:22.877473816Z [net] Received Message
2020-08-24T21:55:22.877514620Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:22.877520491Z [net] received publish, msgId: 212
2020-08-24T21:55:22.877757778Z [net] logic waiting for msg on ibound
2020-08-24T21:55:22.877773826Z [router] matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.
2020-08-24T21:55:23.888434333Z [net] Received Message
2020-08-24T21:55:23.888488840Z [net] Received Message
2020-08-24T21:55:23.888497871Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:23.888503895Z [net] received publish, msgId: 17
2020-08-24T21:55:23.888509438Z [net] logic waiting for msg on ibound
2020-08-24T21:55:23.888515697Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:23.888589850Z [net] received publish, msgId: 16
2020-08-24T21:55:23.888607013Z [net] logic waiting for msg on ibound
2020-08-24T21:55:23.888611792Z [router] matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.
2020-08-24T21:55:23.888615805Z [router] matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.
2020-08-24T21:55:24.221344830Z [net] Received Message
2020-08-24T21:55:24.221383938Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:24.221495528Z [net] received publish, msgId: 6771
2020-08-24T21:55:24.221516028Z [net] logic waiting for msg on ibound
2020-08-24T21:55:24.221524056Z [router] matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.
2020-08-24T21:55:24.607500611Z [net] Received Message
2020-08-24T21:55:24.607543889Z [net] startIncommingComms: got msg on ibound
2020-08-24T21:55:24.607552159Z [net] received publish, msgId: 3004
2020-08-24T21:55:24.607631599Z [net] logic waiting for msg on ibound
2020-08-24T21:55:24.607653432Z [router] matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.

@MattBrittan
Copy link
Contributor

Unfortunately I cannot spot anything obvious in the logs but do have a couple of suggestions below. In order to assist further I think I'd need a minimal, reproducible example application that demonstrates the issue (perhaps using broker.emqx.io). These issues can be difficult to track down and, unfortunately, with the limited amount of code you have provided I cannot see anything actionable (note that the issue may not be due to a problem with the go library).

As far as I can see the go library is running OK; the ping checks exercise input/output and responses are fairly rapid; e.g.:

2020-08-24T21:55:22.760957210Z [pinger] ping check 0.89143763

I would also note that a network issue could result in your "check alive" message being lost and, because you are sending one every 5 seconds and raising an error if nothing is received for 10 seconds, all it would really take is one missing message (I'm unsure if emqx will resend a message that has not been acknowledged; the spec only requires that unacknowledged messages be resent when the connection is re-established).

Default Message Handler

From the log: matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.

This means that the client is receiving messages for which there is no subscription (this means the message is totally ignored and no ACK will be sent). If you want to see what these messages are (and send an ACK) use SetDefaultPublishHandler; below is an example that just logs what is received (I'm using zerolog but you should get the idea...).

opts.SetDefaultPublishHandler(
   func(client mqttlib.Client, msg mqttlib.Message) {
		log.Error().Str("func", "manageBrokerConnection").Str("topic", msg.Topic()).Bytes("payload", msg.Payload()).Msg("Received unexpected message on default handler")
	})

You did not mention how long the application is running before it stops but one possibility is that the broker is exhausting available Packet Identifiers (called msgId in the log) (65534 available and every-time a message is left unacknowledged one of these will be lost because the broker is waiting for a response it will never receive). I note that the packet identifiers being received from emqx are jumping around a bit (but this may be normal; I have no experience with emqx and the MQTT spec does not specify how packet identifiers should be allocated just that they must be unique).

Check alive

c.Subscribe("test", 1, func(client MQTT.Client, msg MQTT.Message) {
			alive = time.Now().Unix()
		})

There is a potential for a data race here (you have separate go-routines reading/writing to alive; this may lead to a race (your code is very close to this example). This is probably not causing an issue here but as races can be unpredictable it would be best to change this.

@apolloyang2017
Copy link
Author

@MattBrittan , thank you very much, I will enhance and continue to trace ~

@MattBrittan
Copy link
Contributor

Closing this due to inactivity; if the issue is still occurring and you have further info then feel free to re-open.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants