Skip to content

Commit

Permalink
Merge pull request #361 from robbawebba/fix/persist-subscribe-packets
Browse files Browse the repository at this point in the history
Persist outbound subscribe packets
  • Loading branch information
Al S-M committed Oct 23, 2019
2 parents 0d8f631 + 0621310 commit 0bdac0d
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 10 deletions.
55 changes: 52 additions & 3 deletions client.go
Expand Up @@ -668,7 +668,6 @@ func (c *client) Subscribe(topic string, qos byte, callback MessageHandler) Toke
}
sub.Topics = append(sub.Topics, topic)
sub.Qoss = append(sub.Qoss, qos)
DEBUG.Println(CLI, sub.String())

if strings.HasPrefix(topic, "$share/") {
topic = strings.Join(strings.Split(topic, "/")[2:], "/")
Expand All @@ -683,7 +682,31 @@ func (c *client) Subscribe(topic string, qos byte, callback MessageHandler) Toke
}

token.subs = append(token.subs, topic)
c.oboundP <- &PacketAndToken{p: sub, t: token}

if sub.MessageID == 0 {
sub.MessageID = c.getID(token)
token.messageID = sub.MessageID
}
DEBUG.Println(CLI, sub.String())

persistOutbound(c.persist, sub)
switch c.connectionStatus() {
case connecting:
DEBUG.Println(CLI, "storing subscribe message (connecting), topic:", topic)
case reconnecting:
DEBUG.Println(CLI, "storing subscribe message (reconnecting), topic:", topic)
default:
DEBUG.Println(CLI, "sending subscribe message, topic:", topic)
subscribeWaitTimeout := c.options.WriteTimeout
if subscribeWaitTimeout == 0 {
subscribeWaitTimeout = time.Second * 30
}
select {
case c.oboundP <- &PacketAndToken{p: sub, t: token}:
case <-time.After(subscribeWaitTimeout):
token.setError(errors.New("subscribe was broken by timeout"))
}
}
DEBUG.Println(CLI, "exit Subscribe")
return token
}
Expand Down Expand Up @@ -711,7 +734,29 @@ func (c *client) SubscribeMultiple(filters map[string]byte, callback MessageHand
}
token.subs = make([]string, len(sub.Topics))
copy(token.subs, sub.Topics)
c.oboundP <- &PacketAndToken{p: sub, t: token}

if sub.MessageID == 0 {
sub.MessageID = c.getID(token)
token.messageID = sub.MessageID
}
persistOutbound(c.persist, sub)
switch c.connectionStatus() {
case connecting:
DEBUG.Println(CLI, "storing subscribe message (connecting), topics:", sub.Topics)
case reconnecting:
DEBUG.Println(CLI, "storing subscribe message (reconnecting), topics:", sub.Topics)
default:
DEBUG.Println(CLI, "sending subscribe message, topics:", sub.Topics)
subscribeWaitTimeout := c.options.WriteTimeout
if subscribeWaitTimeout == 0 {
subscribeWaitTimeout = time.Second * 30
}
select {
case c.oboundP <- &PacketAndToken{p: sub, t: token}:
case <-time.After(subscribeWaitTimeout):
token.setError(errors.New("subscribe was broken by timeout"))
}
}
DEBUG.Println(CLI, "exit SubscribeMultiple")
return token
}
Expand Down Expand Up @@ -754,7 +799,11 @@ func (c *client) resume(subscription bool) {
case *packets.SubscribePacket:
if subscription {
DEBUG.Println(STR, fmt.Sprintf("loaded pending subscribe (%d)", details.MessageID))
subPacket := packet.(*packets.SubscribePacket)
token := newToken(packets.Subscribe).(*SubscribeToken)
token.messageID = details.MessageID
token.subs = append(token.subs, subPacket.Topics...)
c.claimID(token, details.MessageID)
select {
case c.oboundP <- &PacketAndToken{p: packet, t: token}:
case <-c.stop:
Expand Down
86 changes: 86 additions & 0 deletions fvt_client_test.go
Expand Up @@ -1205,3 +1205,89 @@ func Test_ConnectRetryPublish(t *testing.T) {
s.Disconnect(250)
memStore.Close()
}

func Test_ResumeSubs(t *testing.T) {
topic := "/test/ResumeSubs"
var qos byte = 1
payload := "sample Payload"
choke := make(chan bool)

// subscribe to topic before establishing a connection, and publish a message after the publish client has connected successfully
subMemStore := NewMemoryStore()
subMemStore.Open()
sops := NewClientOptions().AddBroker("256.256.256.256").SetClientID("resumesubs-sub").SetConnectRetry(true).
SetConnectRetryInterval(time.Second / 2).SetResumeSubs(true).SetStore(subMemStore)

s := NewClient(sops)
sConnToken := s.Connect()

subToken := s.Subscribe(topic, qos, nil)

// Verify the subscribe packet exists in the memorystore
ids := subMemStore.All()
if len(ids) == 0 {
t.Fatalf("Expected subscribe packet to be in store")
} else if len(ids) != 1 {
t.Fatalf("Expected 1 packet to be in store")
}
packet := subMemStore.Get(ids[0])
if packet == nil {
t.Fatal("Failed to retrieve packet from store")
}
sp, ok := packet.(*packets.SubscribePacket)
if !ok {
t.Fatalf("Packet in store not of the expected type (%T)", packet)
}
if len(sp.Topics) != 1 || sp.Topics[0] != topic || len(sp.Qoss) != 1 || sp.Qoss[0] != qos {
t.Fatalf("Stored Subscribe Packet contents not as expected (%v, %v)", sp.Topics, sp.Qoss)
}

time.Sleep(time.Second) // Wait a second to ensure we are past SetConnectRetryInterval
if sConnToken.Error() != nil {
t.Fatalf("Connect returned error (should be retrying) (%v)", sConnToken.Error())
}
if subToken.Error() != nil {
t.Fatalf("Subscribe returned error (should be persisted) (%v)", sConnToken.Error())
}

// test that the stored subscribe packet gets sent to the broker after connecting
subMemStore2 := NewMemoryStore()
subMemStore2.Open()
subMemStore2.Put(ids[0], packet)

s.Disconnect(250)

// Connect to broker and test that subscription was resumed
sops = NewClientOptions().AddBroker(FVTTCP).SetClientID("resumesubs-sub").
SetStore(subMemStore2).SetResumeSubs(true).SetCleanSession(false).SetConnectRetry(true).
SetConnectRetryInterval(time.Second / 2)

var f MessageHandler = func(client Client, msg Message) {
if msg.Topic() != topic || string(msg.Payload()) != payload {
t.Fatalf("Received unexpected message: %v, %v", msg.Topic(), msg.Payload())
}
choke <- true
}
sops.SetDefaultPublishHandler(f)
s = NewClient(sops).(*client)
if sConnToken = s.Connect(); sConnToken.Wait() && sConnToken.Error() != nil {
t.Fatalf("Error on valid subscribe Connect(): %v", sConnToken.Error())
}

// publish message to subscribed topic to verify subscription
pops := NewClientOptions().AddBroker(FVTTCP).SetClientID("resumesubs-pub").SetCleanSession(true).
SetConnectRetry(true).SetConnectRetryInterval(time.Second / 2)
p := NewClient(pops).(*client)
if pConnToken := p.Connect(); pConnToken.Wait() && pConnToken.Error() != nil {
t.Fatalf("Error on valid Publish.Connect(): %v", pConnToken.Error())
}

if pubToken := p.Publish(topic, 1, false, payload); pubToken.Wait() && pubToken.Error() != nil {
t.Fatalf("Error on valid Client.Publish(): %v", pubToken.Error())
}

wait(choke)

s.Disconnect(250)
p.Disconnect(250)
}
6 changes: 0 additions & 6 deletions net.go
Expand Up @@ -178,12 +178,6 @@ func outgoing(c *client) {
}
DEBUG.Println(NET, "obound wrote msg, id:", msg.MessageID)
case msg := <-c.oboundP:
switch msg.p.(type) {
case *packets.SubscribePacket:
msg.p.(*packets.SubscribePacket).MessageID = c.getID(msg.t)
case *packets.UnsubscribePacket:
msg.p.(*packets.UnsubscribePacket).MessageID = c.getID(msg.t)
}
DEBUG.Println(NET, "obound priority msg to write, type", reflect.TypeOf(msg.p))
if err := msg.p.Write(c.conn); err != nil {
ERROR.Println(NET, "outgoing stopped with error", err)
Expand Down
1 change: 1 addition & 0 deletions token.go
Expand Up @@ -160,6 +160,7 @@ type SubscribeToken struct {
baseToken
subs []string
subResult map[string]byte
messageID uint16
}

// Result returns a map of topics that were subscribed to along with
Expand Down
2 changes: 1 addition & 1 deletion unit_client_test.go
Expand Up @@ -104,4 +104,4 @@ func Test_isConnectionOpenNegative(t *testing.T) {
if c.IsConnectionOpen() {
t.Fail()
}
}
}

0 comments on commit 0bdac0d

Please sign in to comment.