Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retained messages are not delivered on autoreconnect #446

Closed
rkintzi opened this issue Aug 4, 2020 · 9 comments
Closed

Retained messages are not delivered on autoreconnect #446

rkintzi opened this issue Aug 4, 2020 · 9 comments

Comments

@rkintzi
Copy link

rkintzi commented Aug 4, 2020

I have two MQTT applications:

  1. Embedded application written in C++
  2. Golang application based on the library from this repo.

Here is the scenario:

  1. Both applications are connected to the broker (Mosquitto), all messages were delivered.
  2. I stop the broker. Both apps notice that immediately (it is normal stop so broker has an occasion to close connections):
    a. Embedded application changes its internal status but for obvious reasons can not broadcast the fact.
    b. Golang app does nothing (just waits for the broker and tries to reconnect).
  3. I start the broker:
    a. The embedded application reconnects first (just a matter of luck) and publishes its new status. Messages are marked as retained and the broker receives them.
    b. Then, the golang application reconnects but does not receive messages published by the embedded application (in step 3a). Reconnection happens automatically (opts.AutoReconnect = true)

I also have a third application mosquitto_sub which always receives messages published in step 3a. above. Unfortunately, I can not tell if it reconnects before the embedded app or after it. If the Golang app reconnects before the embedded one, messages are delivered.

My question is if this is a normal situation or I misconfigured something?

@rkintzi rkintzi changed the title Retained messages are not delivered on autoreconnect. Retained messages are not delivered on autoreconnect Aug 4, 2020
@MattBrittan
Copy link
Contributor

Hi @rkintzi,

My expectation (with limited details as to your configuration) is that Mosquitto should deliver the message and your golang app should receive it upon reconnection. However as you have not provided any code or logs it's difficult to provide guidance. Does the golang app receive messages published after it has restarted or is it just the retained message that is not received? I would firstly check that you are setting CleanSession to false (if true then the broker will drop your session when the connection is lost and upon reconnection there will be no active subscriptions so you would not receive the retained message or any future messages).

Note: This question may be a better fit for one of the resources covered in the readme or stackoverflow. When raising issues here please include code (at a minimum how the options are set and how you are subscribing) and logs so the issue can be diagnosed.

Matt

@rkintzi
Copy link
Author

rkintzi commented Aug 5, 2020

Hi Matt

Thanks for your quick reaction. I appreciate.
Here are example code that replicates the issue (main.go):

package main

import (
	"flag"
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	mqtt "github.com/eclipse/paho.mqtt.golang"
)

func main() {
	var (
		addr string
		port string
		ID   string
	)
	flag.StringVar(&addr, "addr", "127.0.0.1", "mqtt broker addr")
	flag.StringVar(&port, "port", "1883", "mqtt broker port")
	flag.StringVar(&ID, "id", "8d8c2a8c-fa09-492e-b732-9bf1eb0181a1", "client ID")

	flag.Parse()

	opts := mqtt.NewClientOptions()
	opts.AddBroker(fmt.Sprintf("tcp://%s:%s", addr, port))
	opts.SetClientID(ID)
	opts.SetCleanSession(false)
	opts.SetKeepAlive(15 * time.Second)
	opts.AutoReconnect = true
	opts.SetConnectionLostHandler(func(cli mqtt.Client, err error) {
		log.Printf("Connection lost: %v", err)
	})
	opts.SetOnConnectHandler(func(cli mqtt.Client) {
		log.Printf("Connected to broker")
	})

	cli := mqtt.NewClient(opts)

	if token := cli.Connect(); token.Wait() && token.Error() != nil {
		log.Fatalf("Failed to connect: %v", token.Error())
	}
	cli.Subscribe("app/+/status/+", 1, func(cli mqtt.Client, msg mqtt.Message) {
		log.Printf("Message received on topic \"%s\": %s", msg.Topic(), string(msg.Payload()))
	})

	ch := make(chan os.Signal, 1)
	signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
	sig := <-ch
	log.Printf("Interupted with signal %d", sig)

	cli.Disconnect(250)
}

On a first console I run following script (with logs):

[~]$ mosquitto_pub -h 192.168.1.10 -r -q 0 -t app/1/status/1 -m 0; \
mosquitto_sub -h 192.168.1.10 -t app/+/status/+ -v  | \
while read line; do \
    echo `date -Iseconds` $line; \
done;
2020-08-05T11:10:36+02:00 app/1/status/1 0

It publishes message "0" (qos 0, retained) and then subscribes for a messages and prints them with timestamps. This is a baseline.

On a second console I run above program:

[~/mqtttest]$ ./mqtttest -addr 192.168.1.10
2020/08/05 11:10:37 Connected to broker
2020/08/05 11:10:37 Message received on topic "app/1/status/1": 0

So far so good. But when on a third console I run ( this one is on broker host so I don't have to specify an address):

$ sudo systemctl stop mosquitto; \
sleep 1; \
sudo systemctl start mosquitto; \
mosquitto_pub -r -q 0 -t app/1/status/1 -m 10; \
sleep 20; \
mosquitto_pub -r -q 0 -t app/1/status/1 -m 11

I receive on the first console (the one where mosquitto_sub is ran):

[~]$ mosquitto_pub -h 192.168.1.10 -r -q 0 -t app/1/status/1 -m 0; \
mosquitto_sub -h 192.168.1.10 -t app/+/status/+ -v  | \
while read line; do \
    echo `date -Iseconds` $line; \
done;
2020-08-05T11:10:36+02:00 app/1/status/1 0
2020-08-05T11:10:41+02:00 app/1/status/1 10
2020-08-05T11:11:00+02:00 app/1/status/1 11

and on the second (with example program):

[~/mqtttest]$ ./mqtttest -addr 192.168.1.10
2020/08/05 11:10:37 Connected to broker
2020/08/05 11:10:37 Message received on topic "app/1/status/1": 0
2020/08/05 11:10:39 Connection lost: EOF
2020/08/05 11:10:42 Connected to broker
2020/08/05 11:11:00 Message received on topic "app/1/status/1": 11
2020/08/05 11:11:05 Interupted with signal 2
  1. Every message is sent with qos 0 and marked retained.
  2. mosquitto_sub from the first console received every single message
  3. The golang app did not received second message, that was sent while the app was disconnected
  4. The golang app received third message that was sent after the app had reconnected
  5. Setting CleanSession to false changed nothing

And just for the sake of completeness (go.mod):

module mqtttest

go 1.14

require (
	github.com/eclipse/paho.mqtt.golang v1.2.0
	golang.org/x/net v0.0.0-20200707034311-ab3426394381 // indirect
	google.golang.org/appengine v1.6.6
)

@rkintzi
Copy link
Author

rkintzi commented Aug 5, 2020

One more log I missed before (mosqutto.log):

$ sudo tail -f -n100 /var/log/mosquitto/mosquitto.log | \
while read d t; do d=`echo $d|sed -e"s/://"`; echo `date -Isecond -d @$d` $t; done
2020-08-05T11:43:05+01:00 New client connected from ::1 as mosqpub|3070-gs (c1, k60).
2020-08-05T11:43:05+01:00 Client mosqpub|3070-gs disconnected.
2020-08-05T11:43:09+01:00 Client mosq-BwSW7AaJPxhbB9gJwU disconnected.
2020-08-05T11:43:10+01:00 New connection from 192.168.1.14 on port 1883.
2020-08-05T11:43:10+01:00 New client connected from 192.168.1.14 as mosq-dNszbbC9U6qCVaTaO7 (c1, k60).
2020-08-05T11:43:10+01:00 Client mosq-dNszbbC9U6qCVaTaO7 disconnected.
2020-08-05T11:43:10+01:00 New connection from 192.168.1.14 on port 1883.
2020-08-05T11:43:10+01:00 New client connected from 192.168.1.14 as mosq-brS0cbNZeSS4eBBsff (c1, k60).
2020-08-05T11:43:11+01:00 Client 2C:F4:32:44:D7:39 has exceeded timeout, disconnecting.
2020-08-05T11:43:11+01:00 Socket error on client 2C:F4:32:44:D7:39, disconnecting.
2020-08-05T11:43:12+01:00 Client 8d8c2a8c-fa09-492e-b732-9bf1eb0181a1 disconnected.
2020-08-05T11:43:13+01:00 New connection from 192.168.1.14 on port 1883.
2020-08-05T11:43:13+01:00 Socket error on client 8d8c2a8c-fa09-492e-b732-9bf1eb0181a1, disconnecting.
2020-08-05T11:43:13+01:00 New client connected from 192.168.1.14 as 8d8c2a8c-fa09-492e-b732-9bf1eb0181a1 (c0, k15).
2020-08-05T11:43:20+01:00 mosquitto version 1.5.7 terminating
2020-08-05T11:43:21+01:00 mosquitto version 1.5.7 starting
2020-08-05T11:43:21+01:00 Config loaded from /etc/mosquitto/mosquitto.conf.
2020-08-05T11:43:21+01:00 Opening ipv4 listen socket on port 1883.
2020-08-05T11:43:21+01:00 Opening ipv6 listen socket on port 1883.
2020-08-05T11:43:21+01:00 New connection from ::1 on port 1883.
2020-08-05T11:43:21+01:00 New client connected from ::1 as mosqpub|3166-gs (c1, k60).
2020-08-05T11:43:21+01:00 Client mosqpub|3166-gs disconnected.
2020-08-05T11:43:22+01:00 New connection from 192.168.1.14 on port 1883.
2020-08-05T11:43:22+01:00 New client connected from 192.168.1.14 as mosq-brS0cbNZeSS4eBBsff (c1, k60).
2020-08-05T11:43:23+01:00 New connection from 192.168.1.14 on port 1883.
2020-08-05T11:43:23+01:00 Socket error on client 8d8c2a8c-fa09-492e-b732-9bf1eb0181a1, disconnecting.
2020-08-05T11:43:23+01:00 New client connected from 192.168.1.14 as 8d8c2a8c-fa09-492e-b732-9bf1eb0181a1 (c0, k15).
2020-08-05T11:43:41+01:00 New connection from ::1 on port 1883.

And logs from the same experiment as above (run at the same time when above entries where gathered; note different time zones; +02:00 if not specified):

[~]$ mosquitto_pub -h 192.168.1.10 -r -q 0 -t app/1/status/1 -m 0; \
mosquitto_sub -h 192.168.1.10 -t app/+/status/+ -v  | \
while read line; do \
    echo `date -Iseconds` $line; \
done
2020-08-05T12:43:11+02:00 app/1/status/1 0
2020-08-05T12:43:22+02:00 app/1/status/1 10
2020-08-05T12:43:42+02:00 app/1/status/1 11
[~/mqtttest]$ ./mqtttest -addr 192.168.1.10
2020/08/05 12:43:13 Connected to broker
2020/08/05 12:43:13 Message received on topic "app/1/status/1": 0
2020/08/05 12:43:20 Connection lost: EOF
2020/08/05 12:43:23 Connected to broker
2020/08/05 12:43:42 Message received on topic "app/1/status/1": 11

All the remarks from previous comment holds, but one more thing can be observed:
6. The golang app has some trouble to reconnect (third line from the bottom in mosquitto.log).

Radek

@MattBrittan
Copy link
Contributor

Thanks very much for the detailed logs. Looking at your go.mod (always worth including this!) I note that you are using version 1.2.0; while this is the latest release it is now fairly old and there have been fairly major changes (e.g. the refactoring/tidy-up exercise which was specifically to deal with this kind of thing) in the meantime. This means that attempting to diagnose the issue on release 1.2.0 is probably of limited worth (sorry!).

Can you please try the current code (go get github.com/eclipse/paho.mqtt.golang@master); I hope there will be a new release shortly (I don't have commit rights on this project).

@rkintzi
Copy link
Author

rkintzi commented Aug 6, 2020

Upgrading to master does not help. The golang application still does not receive the second message.

I introduced a few changes to the test procedure:

  1. Logs verbosity was increased in mosquitto configuration (config file attached below).
  2. The test program was modified to end automatically (a new version attached below).
  3. mosquitto_sub is now not run to reduce the amount of log entries (this subscriber works correctly and receives all three messages).
  4. All components are run on a single machine and write messages to the system journal, which combines them in a single stream.

Here are new logs:

2020-08-06T14:51:49+0200 script[190668]: Test started at: 2020-08-06T14:51:49+02:00
2020-08-06T14:51:49+0200 script[190668]: mosquitto_pub -i test_script -r -q 0 -t app/1/status/1 -m 0
2020-08-06T14:51:49+0200 mosquitto[190674]: 1596718309: No will message specified.
2020-08-06T14:51:49+0200 mosquitto[190674]: 1596718309: Sending CONNACK to test_script (0, 0)
2020-08-06T14:51:49+0200 mosquitto[190674]: 1596718309: Received PUBLISH from test_script (d0, q0, r1, m0, 'app/1/status/1', ... (1 bytes))
2020-08-06T14:51:49+0200 mosquitto[190674]: 1596718309: Received DISCONNECT from test_script
2020-08-06T14:51:51+0200 mosquitto[190674]: 1596718311: No will message specified.
2020-08-06T14:51:51+0200 mosquitto[190674]: 1596718311: Sending CONNACK to mqtttest_program (1, 0)
2020-08-06T14:51:51+0200 mosquitto[190674]: 1596718311: Received SUBSCRIBE from mqtttest_program 2020-08-06T14:51:51+0200 mosquitto[190674]: 1596718311:         app/+/status/+ (QoS 1)
2020-08-06T14:51:51+0200 mosquitto[190674]: 1596718311: Sending SUBACK to mqtttest_program
2020-08-06T14:51:51+0200 mosquitto[190674]: 1596718311: Sending PUBLISH to mqtttest_program (d0, q0, r1, m0, 'app/1/status/1', ... (1 bytes))
2020-08-06T14:51:51+0200 mqtttest[190680]: Connected to broker
2020-08-06T14:51:51+0200 mqtttest[190680]: Message received on topic "app/1/status/1": 0
2020-08-06T14:51:51+0200 script[190668]: sudo systemctl stop mosquitto.service
2020-08-06T14:51:52+0200 mqtttest[190680]: Connection lost: EOF
2020-08-06T14:51:53+0200 script[190668]: sudo systemctl start mosquitto.service
2020-08-06T14:51:54+0200 script[190668]: mosquitto_pub -i test_script -r -q 0 -t app/1/status/1 -m 10
2020-08-06T14:51:54+0200 mosquitto[190697]: 1596718314: No will message specified.
2020-08-06T14:51:54+0200 mosquitto[190697]: 1596718314: Sending CONNACK to test_script (0, 0)
2020-08-06T14:51:54+0200 mosquitto[190697]: 1596718314: Received PUBLISH from test_script (d0, q0, r1, m0, 'app/1/status/1', ... (2 bytes))
2020-08-06T14:51:54+0200 mosquitto[190697]: 1596718314: Received DISCONNECT from test_script
2020-08-06T14:51:55+0200 mosquitto[190697]: 1596718315: No will message specified.
2020-08-06T14:51:55+0200 mosquitto[190697]: 1596718315: Sending CONNACK to mqtttest_program (1, 0)
2020-08-06T14:51:55+0200 mqtttest[190680]: Connected to broker
2020-08-06T14:52:10+0200 mosquitto[190697]: 1596718330: Received PINGREQ from mqtttest_program
2020-08-06T14:52:10+0200 mosquitto[190697]: 1596718330: Sending PINGRESP to mqtttest_program
2020-08-06T14:52:14+0200 script[190668]: mosquitto_pub -i test_script -r -q 0 -t app/1/status/1 -m 100
2020-08-06T14:52:14+0200 mosquitto[190697]: 1596718334: No will message specified.
2020-08-06T14:52:14+0200 mosquitto[190697]: 1596718334: Sending CONNACK to test_script (0, 0)
2020-08-06T14:52:14+0200 mosquitto[190697]: 1596718334: Received PUBLISH from test_script (d0, q0, r1, m0, 'app/1/status/1', ... (3 bytes))
2020-08-06T14:52:14+0200 mosquitto[190697]: 1596718334: Sending PUBLISH to mqtttest_program (d0, q0, r0, m0, 'app/1/status/1', ... (3 bytes))
2020-08-06T14:52:14+0200 mosquitto[190697]: 1596718334: Received DISCONNECT from test_script
2020-08-06T14:52:14+0200 mosquitto[190697]: 1596718334: Received DISCONNECT from mqtttest_program
2020-08-06T14:52:14+0200 mqtttest[190680]: Message received on topic "app/1/status/1": 100
2020-08-06T14:52:14+0200 mqtttest[190680]: End message received
2020-08-06T14:52:14+0200 script[190668]: Test ended at: 2020-08-06T14:52:14+02:00

Mosquitto config file:

persistence true
persistence_location /var/lib/mosquitto/
log_type debug error warning notice information subscribe unsubscribe

script.sh:

#!/bin/bash
run() {
    echo $@
    $@
}

run sudo systemctl restart mosquitto.service
sleep 5
echo Test started at: `date -Iseconds`
run mosquitto_pub -i test_script -r -q 0 -t app/1/status/1 -m 0
echo !!!
echo !!! please start mqttest and press enter to continue
echo !!!
read f
run sudo systemctl stop mosquitto.service
sleep 1
run sudo systemctl start mosquitto.service
sleep 1
run mosquitto_pub -i test_script -r -q 0 -t app/1/status/1 -m 10
sleep 20
run mosquitto_pub -i test_script -r -q 0 -t app/1/status/1 -m 100
echo Test ended at: `date -Iseconds`

New test program (main.go):

package main

import (
	"flag"
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	mqtt "github.com/eclipse/paho.mqtt.golang"
)

func main() {
	var (
		addr string
		port string
		ID   string
	)
	flag.StringVar(&addr, "addr", "127.0.0.1", "mqtt broker addr")
	flag.StringVar(&port, "port", "1883", "mqtt broker port")
	flag.StringVar(&ID, "id", "mqtttest_program", "client ID")

	flag.Parse()

	log.SetFlags(0)

	opts := mqtt.NewClientOptions()
	opts.AddBroker(fmt.Sprintf("tcp://%s:%s", addr, port))
	opts.SetClientID(ID)
	opts.SetKeepAlive(15 * time.Second)
	opts.AutoReconnect = true
	opts.CleanSession = false
	opts.SetConnectionLostHandler(func(cli mqtt.Client, err error) {
		log.Printf("Connection lost: %v", err)
	})
	opts.SetOnConnectHandler(func(cli mqtt.Client) {
		log.Printf("Connected to broker")
	})

	cli := mqtt.NewClient(opts)
	if token := cli.Connect(); token.Wait() && token.Error() != nil {
		log.Fatalf("Failed to connect: %v", token.Error())
	}
	end := make(chan struct{})
	cli.Subscribe("app/+/status/+", 1, func(cli mqtt.Client, msg mqtt.Message) {
		p := string(msg.Payload())
		log.Printf("Message received on topic \"%s\": %s", msg.Topic(), p)
		if p == "100" {
			end <- struct{}{}
		}
	})

	ch := make(chan os.Signal, 1)
	signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
	select {
	case sig := <-ch:
		log.Printf("Interupted with signal %d", sig)
	case <-end:
		log.Printf("End message received")
	}
	cli.Disconnect(250)
}

go.mod:

module mqtttest

go 1.14

require (
	github.com/eclipse/paho.mqtt.golang v1.2.1-0.20200609161119-ca94c5368c77
	golang.org/x/net v0.0.0-20200707034311-ab3426394381 // indirect
	google.golang.org/appengine v1.6.6
)

@rkintzi
Copy link
Author

rkintzi commented Aug 6, 2020

I have increased verbosity in test application: https://gist.github.com/rkintzi/5db6cea978057484a9c37b67b9a872e9

(edited: logs moved to linked gist)

@rkintzi
Copy link
Author

rkintzi commented Aug 6, 2020

I see no sign in the logs that Mosquitto is trying to deliver the missing message to the test app. I don't know if this is a bug in a Mosquitto, library or test application. Or maybe it is an expected behavior?

Two additional remarks:

  1. Changing the CleanSession option affects the program behavior in an expected way (previously I said it is not, but it was my mistake).
  2. When I call the Subscribe method in OnConnectHandler the program works correctly (no matter how CleanSession option is set).

@MattBrittan
Copy link
Contributor

Looking into this further what you are seeing makes sense; from the mqtt spec:

When a new subscription is established, the last retained message, if any, on each matching topic name MUST be sent to the subscriber [MQTT-3.3.1-6].

So, as per the spec, Mosquitto will only deliver a retained message when a new subscription is established (not upon a connection being re-established). The auto-reconnection code does not resubscribe (there is generally no need and the code has no way of knowing what topics should be active) but due to CleanSession being false existing subscriptions will remain active. This means you will not receive the will messages (unless you resubscribe in the OnConnectHandler; which is a good idea in any event).

So the approach you have discovered should work; however I must add that if you want guaranteed delivery then you should be using a higher QOS level (at QOS 0 storing the retained message is a "should" rather than a "will" in the spec). Apologies for not picking this up earlier (I rarely use QOS 0 so have not experienced this myself).

@rkintzi
Copy link
Author

rkintzi commented Aug 10, 2020

Thanks Matt for pushing me in the right direction. I close the issue as it's definitely not a library problem (I even reimplemented the test program in Dart lang and it works exactly the same way).

R.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants