Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broker receiving acks while connecting with clean session #400

Closed
tekjar opened this issue Feb 4, 2020 · 5 comments
Closed

Broker receiving acks while connecting with clean session #400

tekjar opened this issue Feb 4, 2020 · 5 comments

Comments

@tekjar
Copy link

tekjar commented Feb 4, 2020

Hi. I'm using this client to benchmark the broker I'm developing. Below are the logs of my broker. Client seems to be sending pending acks during automatic reconnection. packet id 2 in this case even though I've specified clean session in my client code.

395: DEBUG rumq_broker::router   > Incoming router message. Id = bench-1, Connect((Connect { protocol: MQTT(4), keep_alive: 10, client_id: "bench-1", clean_session: true, last_will: None, username: None, password: None }
396: INFO  rumq_broker::router   > Connect. Id = "bench-1"
407: DEBUG rumq_broker::router   > Incoming router message. Id = bench-1, Packet(Subscribe(Subscribe { pkid: PacketIdentifier(1), topics: [SubscribeTopic { topic_path: "hello/mqtt/rumqtt", qos: AtLeastOnce }] }))
408: DEBUG rumq_broker::router   > Subscribe. Id = "bench-1",  Topics = [SubscribeTopic { topic_path: "hello/mqtt/rumqtt", qos: AtLeastOnce }]
414: DEBUG rumq_broker::router   > Forwarding publish. Id = bench-1, Packet(Publish(Topic = hello/mqtt/rumqtt, Qos = AtLeastOnce, Pkid = Some(PacketIdentifier(1)), Payload Size = 1024))
415: DEBUG rumq_broker::router   > Incoming router message. Id = bench-1, Packet(Puback(PacketIdentifier(1)))
416: DEBUG rumq_broker::router   > Puback = PacketIdentifier(1). Id = bench-1
418: DEBUG rumq_broker::router   > Incoming router message. Id = bench-1, Packet(Publish(Topic = hello/mqtt/rumqtt, Qos = AtLeastOnce, Pkid = Some(PacketIdentifier(1)), Payload Size = 1024))
420: DEBUG rumq_broker::router   > Forwarding publish. Id = bench-1, Packet(Publish(Topic = hello/mqtt/rumqtt, Qos = AtLeastOnce, Pkid = Some(PacketIdentifier(2)), Payload Size = 1024))
426: DEBUG rumq_broker::router   > Forwarding publish. Id = bench-1, Packet(Publish(Topic = hello/mqtt/rumqtt, Qos = AtLeastOnce, Pkid = Some(PacketIdentifier(3)), Payload Size = 1024))
427: ERROR rumq_broker::router   > 1. Slow connection durin forward. Dropping handle and moving id to inactive list. Id = bench-1
429: INFO  rumq_broker             > Connection eventloop done!!. Id = "bench-1"
1487: DEBUG rumq_broker::router     > Incoming router message. Id = bench-1, Connect((Connect { protocol: MQTT(4), keep_alive: 10, client_id: "bench-1", clean_session: true, last_will: None, username: None, password: None }
1488: INFO  rumq_broker::router     > Connect. Id = "bench-1"
1494: DEBUG rumq_broker::router     > Incoming router message. Id = bench-1, Packet(Puback(PacketIdentifier(2)))
1495: DEBUG rumq_broker::router     > Puback = PacketIdentifier(2). Id = bench-1
1497: ERROR rumq_broker::router     > State error = Unsolicited(Puback(PacketIdentifier(2))). Id = bench-1
	opts := mqtt.NewClientOptions().AddBroker("tcp://localhost:1883")
	opts.SetClientID(id)
	opts.SetProtocolVersion(4)
	opts.SetCleanSession(true)
	opts.SetKeepAlive(10 * time.Second)

	c := mqtt.NewClient(opts)
	if token := c.Connect(); token.Wait() && token.Error() != nil {
		panic(token.Error())
	}
@MattBrittan
Copy link
Contributor

I have had a quick look into this.

The message router (router.matchAndDispatch) is started when the initial connection is made and remains running from that point on (including during reconnects). The callback and ACK are sent from within a seperate goroutine by calling client.ackFunc.

In many cases the ACK will not be sent because if the reconnection is still in progress when the handler completes the ACK will be dropped. However if the handler takes a while and the reconnection completes before it does then the ACK will be sent.

I can see a few ways of addressing this; one being the approach I took in PR #381 (the router and all comms channels are stopped on reconnect).

@tekjar
Copy link
Author

tekjar commented Feb 10, 2020

Cool. I'll be happy to test when this is ready

@MattBrittan
Copy link
Contributor

Apologies for not responding earlier. Unfortunately this is not an area I have time to look into currently. Hopefully @alsm will get a chance to look at PR #381 at some point (that PR already addresses this issue as part of a major cleanup).

@alsm
Copy link
Contributor

alsm commented Apr 30, 2020

#381 is in, hopefully this will resolve the issue

@MattBrittan
Copy link
Contributor

Closing due to age/inactivity. Please feel free to reopen if this issue is occurring with the current release.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants