-
Notifications
You must be signed in to change notification settings - Fork 521
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Retained messages are not delivered on autoreconnect #446
Comments
Hi @rkintzi, My expectation (with limited details as to your configuration) is that Mosquitto should deliver the message and your golang app should receive it upon reconnection. However as you have not provided any code or logs it's difficult to provide guidance. Does the golang app receive messages published after it has restarted or is it just the retained message that is not received? I would firstly check that you are setting Note: This question may be a better fit for one of the resources covered in the readme or stackoverflow. When raising issues here please include code (at a minimum how the options are set and how you are subscribing) and logs so the issue can be diagnosed. Matt |
Hi Matt Thanks for your quick reaction. I appreciate. package main
import (
"flag"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
func main() {
var (
addr string
port string
ID string
)
flag.StringVar(&addr, "addr", "127.0.0.1", "mqtt broker addr")
flag.StringVar(&port, "port", "1883", "mqtt broker port")
flag.StringVar(&ID, "id", "8d8c2a8c-fa09-492e-b732-9bf1eb0181a1", "client ID")
flag.Parse()
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%s", addr, port))
opts.SetClientID(ID)
opts.SetCleanSession(false)
opts.SetKeepAlive(15 * time.Second)
opts.AutoReconnect = true
opts.SetConnectionLostHandler(func(cli mqtt.Client, err error) {
log.Printf("Connection lost: %v", err)
})
opts.SetOnConnectHandler(func(cli mqtt.Client) {
log.Printf("Connected to broker")
})
cli := mqtt.NewClient(opts)
if token := cli.Connect(); token.Wait() && token.Error() != nil {
log.Fatalf("Failed to connect: %v", token.Error())
}
cli.Subscribe("app/+/status/+", 1, func(cli mqtt.Client, msg mqtt.Message) {
log.Printf("Message received on topic \"%s\": %s", msg.Topic(), string(msg.Payload()))
})
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
sig := <-ch
log.Printf("Interupted with signal %d", sig)
cli.Disconnect(250)
} On a first console I run following script (with logs): [~]$ mosquitto_pub -h 192.168.1.10 -r -q 0 -t app/1/status/1 -m 0; \
mosquitto_sub -h 192.168.1.10 -t app/+/status/+ -v | \
while read line; do \
echo `date -Iseconds` $line; \
done;
2020-08-05T11:10:36+02:00 app/1/status/1 0 It publishes message "0" (qos 0, retained) and then subscribes for a messages and prints them with timestamps. This is a baseline. On a second console I run above program: [~/mqtttest]$ ./mqtttest -addr 192.168.1.10
2020/08/05 11:10:37 Connected to broker
2020/08/05 11:10:37 Message received on topic "app/1/status/1": 0 So far so good. But when on a third console I run ( this one is on broker host so I don't have to specify an address): $ sudo systemctl stop mosquitto; \
sleep 1; \
sudo systemctl start mosquitto; \
mosquitto_pub -r -q 0 -t app/1/status/1 -m 10; \
sleep 20; \
mosquitto_pub -r -q 0 -t app/1/status/1 -m 11 I receive on the first console (the one where [~]$ mosquitto_pub -h 192.168.1.10 -r -q 0 -t app/1/status/1 -m 0; \
mosquitto_sub -h 192.168.1.10 -t app/+/status/+ -v | \
while read line; do \
echo `date -Iseconds` $line; \
done;
2020-08-05T11:10:36+02:00 app/1/status/1 0
2020-08-05T11:10:41+02:00 app/1/status/1 10
2020-08-05T11:11:00+02:00 app/1/status/1 11 and on the second (with example program): [~/mqtttest]$ ./mqtttest -addr 192.168.1.10
2020/08/05 11:10:37 Connected to broker
2020/08/05 11:10:37 Message received on topic "app/1/status/1": 0
2020/08/05 11:10:39 Connection lost: EOF
2020/08/05 11:10:42 Connected to broker
2020/08/05 11:11:00 Message received on topic "app/1/status/1": 11
2020/08/05 11:11:05 Interupted with signal 2
And just for the sake of completeness ( module mqtttest
go 1.14
require (
github.com/eclipse/paho.mqtt.golang v1.2.0
golang.org/x/net v0.0.0-20200707034311-ab3426394381 // indirect
google.golang.org/appengine v1.6.6
) |
One more log I missed before ( $ sudo tail -f -n100 /var/log/mosquitto/mosquitto.log | \
while read d t; do d=`echo $d|sed -e"s/://"`; echo `date -Isecond -d @$d` $t; done
2020-08-05T11:43:05+01:00 New client connected from ::1 as mosqpub|3070-gs (c1, k60).
2020-08-05T11:43:05+01:00 Client mosqpub|3070-gs disconnected.
2020-08-05T11:43:09+01:00 Client mosq-BwSW7AaJPxhbB9gJwU disconnected.
2020-08-05T11:43:10+01:00 New connection from 192.168.1.14 on port 1883.
2020-08-05T11:43:10+01:00 New client connected from 192.168.1.14 as mosq-dNszbbC9U6qCVaTaO7 (c1, k60).
2020-08-05T11:43:10+01:00 Client mosq-dNszbbC9U6qCVaTaO7 disconnected.
2020-08-05T11:43:10+01:00 New connection from 192.168.1.14 on port 1883.
2020-08-05T11:43:10+01:00 New client connected from 192.168.1.14 as mosq-brS0cbNZeSS4eBBsff (c1, k60).
2020-08-05T11:43:11+01:00 Client 2C:F4:32:44:D7:39 has exceeded timeout, disconnecting.
2020-08-05T11:43:11+01:00 Socket error on client 2C:F4:32:44:D7:39, disconnecting.
2020-08-05T11:43:12+01:00 Client 8d8c2a8c-fa09-492e-b732-9bf1eb0181a1 disconnected.
2020-08-05T11:43:13+01:00 New connection from 192.168.1.14 on port 1883.
2020-08-05T11:43:13+01:00 Socket error on client 8d8c2a8c-fa09-492e-b732-9bf1eb0181a1, disconnecting.
2020-08-05T11:43:13+01:00 New client connected from 192.168.1.14 as 8d8c2a8c-fa09-492e-b732-9bf1eb0181a1 (c0, k15).
2020-08-05T11:43:20+01:00 mosquitto version 1.5.7 terminating
2020-08-05T11:43:21+01:00 mosquitto version 1.5.7 starting
2020-08-05T11:43:21+01:00 Config loaded from /etc/mosquitto/mosquitto.conf.
2020-08-05T11:43:21+01:00 Opening ipv4 listen socket on port 1883.
2020-08-05T11:43:21+01:00 Opening ipv6 listen socket on port 1883.
2020-08-05T11:43:21+01:00 New connection from ::1 on port 1883.
2020-08-05T11:43:21+01:00 New client connected from ::1 as mosqpub|3166-gs (c1, k60).
2020-08-05T11:43:21+01:00 Client mosqpub|3166-gs disconnected.
2020-08-05T11:43:22+01:00 New connection from 192.168.1.14 on port 1883.
2020-08-05T11:43:22+01:00 New client connected from 192.168.1.14 as mosq-brS0cbNZeSS4eBBsff (c1, k60).
2020-08-05T11:43:23+01:00 New connection from 192.168.1.14 on port 1883.
2020-08-05T11:43:23+01:00 Socket error on client 8d8c2a8c-fa09-492e-b732-9bf1eb0181a1, disconnecting.
2020-08-05T11:43:23+01:00 New client connected from 192.168.1.14 as 8d8c2a8c-fa09-492e-b732-9bf1eb0181a1 (c0, k15).
2020-08-05T11:43:41+01:00 New connection from ::1 on port 1883. And logs from the same experiment as above (run at the same time when above entries where gathered; note different time zones; +02:00 if not specified): [~]$ mosquitto_pub -h 192.168.1.10 -r -q 0 -t app/1/status/1 -m 0; \
mosquitto_sub -h 192.168.1.10 -t app/+/status/+ -v | \
while read line; do \
echo `date -Iseconds` $line; \
done
2020-08-05T12:43:11+02:00 app/1/status/1 0
2020-08-05T12:43:22+02:00 app/1/status/1 10
2020-08-05T12:43:42+02:00 app/1/status/1 11 [~/mqtttest]$ ./mqtttest -addr 192.168.1.10
2020/08/05 12:43:13 Connected to broker
2020/08/05 12:43:13 Message received on topic "app/1/status/1": 0
2020/08/05 12:43:20 Connection lost: EOF
2020/08/05 12:43:23 Connected to broker
2020/08/05 12:43:42 Message received on topic "app/1/status/1": 11 All the remarks from previous comment holds, but one more thing can be observed: Radek |
Thanks very much for the detailed logs. Looking at your Can you please try the current code ( |
Upgrading to master does not help. The golang application still does not receive the second message. I introduced a few changes to the test procedure:
Here are new logs:
Mosquitto config file: persistence true
persistence_location /var/lib/mosquitto/
log_type debug error warning notice information subscribe unsubscribe
New test program ( package main
import (
"flag"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
func main() {
var (
addr string
port string
ID string
)
flag.StringVar(&addr, "addr", "127.0.0.1", "mqtt broker addr")
flag.StringVar(&port, "port", "1883", "mqtt broker port")
flag.StringVar(&ID, "id", "mqtttest_program", "client ID")
flag.Parse()
log.SetFlags(0)
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%s", addr, port))
opts.SetClientID(ID)
opts.SetKeepAlive(15 * time.Second)
opts.AutoReconnect = true
opts.CleanSession = false
opts.SetConnectionLostHandler(func(cli mqtt.Client, err error) {
log.Printf("Connection lost: %v", err)
})
opts.SetOnConnectHandler(func(cli mqtt.Client) {
log.Printf("Connected to broker")
})
cli := mqtt.NewClient(opts)
if token := cli.Connect(); token.Wait() && token.Error() != nil {
log.Fatalf("Failed to connect: %v", token.Error())
}
end := make(chan struct{})
cli.Subscribe("app/+/status/+", 1, func(cli mqtt.Client, msg mqtt.Message) {
p := string(msg.Payload())
log.Printf("Message received on topic \"%s\": %s", msg.Topic(), p)
if p == "100" {
end <- struct{}{}
}
})
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
select {
case sig := <-ch:
log.Printf("Interupted with signal %d", sig)
case <-end:
log.Printf("End message received")
}
cli.Disconnect(250)
}
|
I have increased verbosity in test application: https://gist.github.com/rkintzi/5db6cea978057484a9c37b67b9a872e9 (edited: logs moved to linked gist) |
I see no sign in the logs that Mosquitto is trying to deliver the missing message to the test app. I don't know if this is a bug in a Mosquitto, library or test application. Or maybe it is an expected behavior? Two additional remarks:
|
Looking into this further what you are seeing makes sense; from the mqtt spec:
So, as per the spec, Mosquitto will only deliver a retained message when a new subscription is established (not upon a connection being re-established). The auto-reconnection code does not resubscribe (there is generally no need and the code has no way of knowing what topics should be active) but due to So the approach you have discovered should work; however I must add that if you want guaranteed delivery then you should be using a higher |
Thanks Matt for pushing me in the right direction. I close the issue as it's definitely not a library problem (I even reimplemented the test program in Dart lang and it works exactly the same way). R. |
I have two MQTT applications:
Here is the scenario:
a. Embedded application changes its internal status but for obvious reasons can not broadcast the fact.
b. Golang app does nothing (just waits for the broker and tries to reconnect).
a. The embedded application reconnects first (just a matter of luck) and publishes its new status. Messages are marked as retained and the broker receives them.
b. Then, the golang application reconnects but does not receive messages published by the embedded application (in step 3a). Reconnection happens automatically (
opts.AutoReconnect = true
)I also have a third application
mosquitto_sub
which always receives messages published in step 3a. above. Unfortunately, I can not tell if it reconnects before the embedded app or after it. If the Golang app reconnects before the embedded one, messages are delivered.My question is if this is a normal situation or I misconfigured something?
The text was updated successfully, but these errors were encountered: