Skip to content

Commit

Permalink
Merge pull request #374 from alsm/master
Browse files Browse the repository at this point in the history
Try and handle awkardness with sub/unsub
  • Loading branch information
Al S-M committed Oct 28, 2019
2 parents 962bb61 + 361aca5 commit 90c9a58
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 1 deletion.
38 changes: 37 additions & 1 deletion client.go
Expand Up @@ -474,7 +474,7 @@ func (c *client) reconnect() {
go incoming(c)

c.workers.Add(1) // disconnect during resume can lead to reconnect being called before resume completes
c.resume(false)
c.resume(c.options.ResumeSubs)
}

// This function is only used for receiving a connack
Expand Down Expand Up @@ -670,6 +670,18 @@ func (c *client) Subscribe(topic string, qos byte, callback MessageHandler) Toke
token.setError(ErrNotConnected)
return token
}
if !c.IsConnectionOpen() {
switch {
case !c.options.ResumeSubs:
// if not connected and resumesubs not set this sub will be thrown away
token.setError(fmt.Errorf("not currently connected and ResumeSubs not set"))
return token
case c.options.CleanSession && c.connectionStatus() == reconnecting:
// if reconnecting and cleansession is true this sub will be thrown away
token.setError(fmt.Errorf("reconnecting state and cleansession is true"))
return token
}
}
sub := packets.NewControlPacket(packets.Subscribe).(*packets.SubscribePacket)
if err := validateTopicAndQos(topic, qos); err != nil {
token.setError(err)
Expand Down Expand Up @@ -730,6 +742,18 @@ func (c *client) SubscribeMultiple(filters map[string]byte, callback MessageHand
token.setError(ErrNotConnected)
return token
}
if !c.IsConnectionOpen() {
switch {
case !c.options.ResumeSubs:
// if not connected and resumesubs not set this sub will be thrown away
token.setError(fmt.Errorf("not currently connected and ResumeSubs not set"))
return token
case c.options.CleanSession && c.connectionStatus() == reconnecting:
// if reconnecting and cleansession is true this sub will be thrown away
token.setError(fmt.Errorf("reconnecting state and cleansession is true"))
return token
}
}
sub := packets.NewControlPacket(packets.Subscribe).(*packets.SubscribePacket)
if sub.Topics, sub.Qoss, err = validateSubscribeMap(filters); err != nil {
token.setError(err)
Expand Down Expand Up @@ -879,6 +903,18 @@ func (c *client) Unsubscribe(topics ...string) Token {
token.setError(ErrNotConnected)
return token
}
if !c.IsConnectionOpen() {
switch {
case !c.options.ResumeSubs:
// if not connected and resumesubs not set this unsub will be thrown away
token.setError(fmt.Errorf("not currently connected and ResumeSubs not set"))
return token
case c.options.CleanSession && c.connectionStatus() == reconnecting:
// if reconnecting and cleansession is true this unsub will be thrown away
token.setError(fmt.Errorf("reconnecting state and cleansession is true"))
return token
}
}
unsub := packets.NewControlPacket(packets.Unsubscribe).(*packets.UnsubscribePacket)
unsub.Topics = make([]string, len(topics))
copy(unsub.Topics, topics)
Expand Down
50 changes: 50 additions & 0 deletions fvt_client_test.go
Expand Up @@ -1293,3 +1293,53 @@ func Test_ResumeSubs(t *testing.T) {
s.Disconnect(250)
p.Disconnect(250)
}

func Test_ResumeSubsWithReconnect(t *testing.T) {
topic := "/test/ResumeSubs"
var qos byte = 1

// subscribe to topic before establishing a connection, and publish a message after the publish client has connected successfully
ops := NewClientOptions().SetClientID("Start").AddBroker(FVTTCP).SetConnectRetry(true).SetConnectRetryInterval(time.Second / 2).
SetResumeSubs(true).SetCleanSession(false)
c := NewClient(ops)
sConnToken := c.Connect()
sConnToken.Wait()
if sConnToken.Error() != nil {
t.Fatalf("Connect returned error (%v)", sConnToken.Error())
}

// Send subscription request and then immediatly force disconnect (hope it will happen before sub sent)
subToken := newToken(packets.Subscribe).(*SubscribeToken)
sub := packets.NewControlPacket(packets.Subscribe).(*packets.SubscribePacket)
sub.Topics = append(sub.Topics, topic)
sub.Qoss = append(sub.Qoss, qos)
subToken.subs = append(subToken.subs, topic)

if sub.MessageID == 0 {
sub.MessageID = c.(*client).getID(subToken)
subToken.messageID = sub.MessageID
}
DEBUG.Println(CLI, sub.String())

persistOutbound(c.(*client).persist, sub)
//subToken := c.Subscribe(topic, qos, nil)
c.(*client).internalConnLost(fmt.Errorf("Reconnection subscription test"))

// As reconnect is enabled the client should automatically reconnect
subDone := make(chan bool)
go func(t *testing.T) {
subToken.Wait()
if err := subToken.Error(); err != nil {
t.Fatalf("Connect returned error (should be retrying) (%v)", err)
}
close(subDone)
}(t)
// Wait for done or timeout
select {
case <-subDone:
case <-time.After(4 * time.Second):
t.Fatalf("Timed out waiting for subToken to complete")
}

c.Disconnect(250)
}

0 comments on commit 90c9a58

Please sign in to comment.