Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Further issues with Commit 361 (Persist outbound subscribe packets) #371

Closed
MattBrittan opened this issue Oct 24, 2019 · 6 comments
Closed

Comments

@MattBrittan
Copy link
Contributor

MattBrittan commented Oct 24, 2019

Hi @alsm,

Further to issue #370 which you have just resolved I think there are a number of other issues with pull request #361.

Using the following as a base:

sops := mqtt.NewClientOptions().AddBroker("tcp://" + "0.0.0.0" + ":1883").SetClientID("resumesubs-sub").SetConnectRetry(true).
		SetConnectRetryInterval(time.Second / 2).SetResumeSubs(true).SetCleanSession(false)

	mc := mqtt.NewClient(sops)
	sConnToken := mc.Connect()
	subToken := mc.Subscribe("test", 2, nil)
	sConnToken.Wait()
	if err := sConnToken.Error(); err != nil {
		fmt.Printf("conn error: %v\n", err)
		return
	}
	fmt.Println("client connected")
	subToken.Wait()
	if err := subToken.Error(); err != nil {
		fmt.Printf("sub error: %v\n", err)
		return
	}
	fmt.Println("End Reached")

This will work as is. However if you change SetResumeSubs to false (the default) or SetCleanSession to true (again the default) then you will never get to "End Reached" (or any errors). The reason for this is that the changes to Subscribe do not take these options into account (previouslly the token would have been immediatly set to an error state).

The above may not be too bad as you would usually wait for the connection to come up before subscribing. However when using 'autoreconnect' the resume function is called with subscription set to false meaning that the subscription requests will not be resent which leads to the same problem (token.Wait() never returns). The test case below demonstrates this (note that this test also fails if SetResumeSubs is false).

Given the issues with this I think it's worth considering reversing the change until a version that takes these factors into account is raised?

func Test_ResumeSubsWithReconnect(t *testing.T) {
	topic := "/test/ResumeSubs"
	var qos byte = 1

	// subscribe to topic before establishing a connection, and publish a message after the publish client has connected successfully
	ops := NewClientOptions().SetClientID("Start").AddBroker(FVTTCP).SetConnectRetry(true).SetConnectRetryInterval(time.Second / 2).
		SetResumeSubs(true).SetCleanSession(false)
	c := NewClient(ops)
	sConnToken := c.Connect()
	sConnToken.Wait()
	if sConnToken.Error() != nil {
		t.Fatalf("Connect returned error (%v)", sConnToken.Error())
	}

	// Send subscription request and then immediatly force disconnect (hope it will happen before sub sent)
	subToken := c.Subscribe(topic, qos, nil)
	c.(*client).internalConnLost(fmt.Errorf("Reconnection subscription test"))

	// As reconnect is enabled the client should automatically reconnect
	subDone := make(chan bool)
	go func() {
		subToken.Wait()
		if err := subToken.Error(); err != nil {
			t.Fatalf("Connect returned error (should be retrying) (%v)", err)
		}
		close(subDone)
	}()
	// Wait for done or timeout
	select {
	case <-subDone:
	case <-time.After(4 * time.Second):
		t.Fatalf("Timed out waiting for subToken to complete")
	}

	c.Disconnect(250)
}
@alsm
Copy link
Contributor

alsm commented Oct 25, 2019

Hmm, just confirms that offline messaging in the client was a terrible idea 😂

alsm pushed a commit to alsm/paho.mqtt.golang that referenced this issue Oct 25, 2019
@alsm
Copy link
Contributor

alsm commented Oct 25, 2019

I've just created a PR that I think will fix this and included your test (although I've taken a bunch of the subscribe code directly to build the sub, token and persist it so there should be timing issues). Please look over it and let me know if you think I missed anything.

@robbawebba
Copy link
Contributor

Hi @MattBrittan,

First of all, I'd like to apologize for any inconveniences I might have caused with PR #361, and I appreciate you highlighting the situations that I did not consider in my implementation.

@alsm I took a look at your proposed changes to improve the current situation and help fix any of the problems #361 caused. I like your proposed changes, but I'm still considering if it would be better to revert my pull request altogether. You're both right in that the changes I submitted provide marginal improvements at best, and the changes might have harmed more applications as opposed to improved them. If this feature is still wanted by you, @MattBrittan , and the rest of the community, I would be happy to help improve the implementation. Thanks for your help, and I'm sorry again about the inconvenience!

@MattBrittan
Copy link
Contributor Author

@robbawebba - no worries; it's great to see more activity on this project. Unfortunately the client has a lot of options and it's easy to miss the interactions between them. I would have had a go at a fixing this but ran out of time so raised the issue in case it was affecting anyone (I've been bitten by a similar bug in the past).

@alsm - agree that the way that offline messaging is implemented has issues. I'd been planning to take a look at your v5 client to see if I could implement this as a wrapper (understand the need to keep things simple but think most use-cases will require reconnect and persistence so it seems better to implement once as part of a supporting package rather than requiring users to implement themselves). You fix looks good to me; there is one edge case I can think of (c.options.ConnectRetry && c.options.CleanSession && c.connectionStatus() == connecting:) that might be worth adding to your switch (but thats probably being picky!),

@alsm
Copy link
Contributor

alsm commented Oct 28, 2019

In that last case what do you expect would happen? I'm thinking that if we are in connecting state, connectretry is on and cleansession is on we should be storing the sub/unsub as normal?

@MattBrittan
Copy link
Contributor Author

My view is that an error should be returned immediatly; this is mainly to be consistent with the behaviour on reconnect and due to the unpredictable behaviour that this set of paramaters would cause.

With things the way they are now the subscribe will succeed if the connection comes up within c.options.WriteTimeout (defaulting to 30s) but will error if it takes longer (and the persisted subscribe will be deleted by the c.persist.Reset() in Connect()). I think this may make a user believe that calling subscribe before the connection is up is a good idea; it will probably work fine when testing but once in production comms issues could lead to the subscription being lost.

Not a big deal either way (I think its safer to call subscribe in the OnConnect() callback rather than relying upon local persistence and broker reliability).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants