Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist outbound subscribe packets #361

Merged
merged 1 commit into from Oct 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
55 changes: 52 additions & 3 deletions client.go
Expand Up @@ -668,7 +668,6 @@ func (c *client) Subscribe(topic string, qos byte, callback MessageHandler) Toke
}
sub.Topics = append(sub.Topics, topic)
sub.Qoss = append(sub.Qoss, qos)
DEBUG.Println(CLI, sub.String())

if strings.HasPrefix(topic, "$share/") {
topic = strings.Join(strings.Split(topic, "/")[2:], "/")
Expand All @@ -683,7 +682,31 @@ func (c *client) Subscribe(topic string, qos byte, callback MessageHandler) Toke
}

token.subs = append(token.subs, topic)
c.oboundP <- &PacketAndToken{p: sub, t: token}

if sub.MessageID == 0 {
sub.MessageID = c.getID(token)
token.messageID = sub.MessageID
}
DEBUG.Println(CLI, sub.String())

persistOutbound(c.persist, sub)
switch c.connectionStatus() {
case connecting:
DEBUG.Println(CLI, "storing subscribe message (connecting), topic:", topic)
case reconnecting:
DEBUG.Println(CLI, "storing subscribe message (reconnecting), topic:", topic)
default:
DEBUG.Println(CLI, "sending subscribe message, topic:", topic)
subscribeWaitTimeout := c.options.WriteTimeout
if subscribeWaitTimeout == 0 {
subscribeWaitTimeout = time.Second * 30
}
select {
case c.oboundP <- &PacketAndToken{p: sub, t: token}:
case <-time.After(subscribeWaitTimeout):
token.setError(errors.New("subscribe was broken by timeout"))
}
}
DEBUG.Println(CLI, "exit Subscribe")
return token
}
Expand Down Expand Up @@ -711,7 +734,29 @@ func (c *client) SubscribeMultiple(filters map[string]byte, callback MessageHand
}
token.subs = make([]string, len(sub.Topics))
copy(token.subs, sub.Topics)
c.oboundP <- &PacketAndToken{p: sub, t: token}

if sub.MessageID == 0 {
sub.MessageID = c.getID(token)
token.messageID = sub.MessageID
}
persistOutbound(c.persist, sub)
switch c.connectionStatus() {
case connecting:
DEBUG.Println(CLI, "storing subscribe message (connecting), topics:", sub.Topics)
case reconnecting:
DEBUG.Println(CLI, "storing subscribe message (reconnecting), topics:", sub.Topics)
default:
DEBUG.Println(CLI, "sending subscribe message, topics:", sub.Topics)
subscribeWaitTimeout := c.options.WriteTimeout
if subscribeWaitTimeout == 0 {
subscribeWaitTimeout = time.Second * 30
}
select {
case c.oboundP <- &PacketAndToken{p: sub, t: token}:
case <-time.After(subscribeWaitTimeout):
token.setError(errors.New("subscribe was broken by timeout"))
}
}
DEBUG.Println(CLI, "exit SubscribeMultiple")
return token
}
Expand Down Expand Up @@ -754,7 +799,11 @@ func (c *client) resume(subscription bool) {
case *packets.SubscribePacket:
if subscription {
DEBUG.Println(STR, fmt.Sprintf("loaded pending subscribe (%d)", details.MessageID))
subPacket := packet.(*packets.SubscribePacket)
token := newToken(packets.Subscribe).(*SubscribeToken)
token.messageID = details.MessageID
token.subs = append(token.subs, subPacket.Topics...)
c.claimID(token, details.MessageID)
select {
case c.oboundP <- &PacketAndToken{p: packet, t: token}:
case <-c.stop:
Expand Down
88 changes: 87 additions & 1 deletion fvt_client_test.go
Expand Up @@ -1181,7 +1181,7 @@ func Test_ConnectRetryPublish(t *testing.T) {

// disconnect and then reconnect with correct server
p.Disconnect(250)

pops = NewClientOptions().AddBroker(FVTTCP).SetClientID("crp-pub").SetCleanSession(false).
SetStore(memStore2).SetConnectRetry(true).SetConnectRetryInterval(time.Second / 2)
p = NewClient(pops).(*client)
Expand All @@ -1198,3 +1198,89 @@ func Test_ConnectRetryPublish(t *testing.T) {
s.Disconnect(250)
memStore.Close()
}

func Test_ResumeSubs(t *testing.T) {
topic := "/test/ResumeSubs"
var qos byte = 1
payload := "sample Payload"
choke := make(chan bool)

// subscribe to topic before establishing a connection, and publish a message after the publish client has connected successfully
subMemStore := NewMemoryStore()
subMemStore.Open()
sops := NewClientOptions().AddBroker("256.256.256.256").SetClientID("resumesubs-sub").SetConnectRetry(true).
SetConnectRetryInterval(time.Second / 2).SetResumeSubs(true).SetStore(subMemStore)

s := NewClient(sops)
sConnToken := s.Connect()

subToken := s.Subscribe(topic, qos, nil)

// Verify the subscribe packet exists in the memorystore
ids := subMemStore.All()
if len(ids) == 0 {
t.Fatalf("Expected subscribe packet to be in store")
} else if len(ids) != 1 {
t.Fatalf("Expected 1 packet to be in store")
}
packet := subMemStore.Get(ids[0])
if packet == nil {
t.Fatal("Failed to retrieve packet from store")
}
sp, ok := packet.(*packets.SubscribePacket)
if !ok {
t.Fatalf("Packet in store not of the expected type (%T)", packet)
}
if len(sp.Topics) != 1 || sp.Topics[0] != topic || len(sp.Qoss) != 1 || sp.Qoss[0] != qos {
t.Fatalf("Stored Subscribe Packet contents not as expected (%v, %v)", sp.Topics, sp.Qoss)
}

time.Sleep(time.Second) // Wait a second to ensure we are past SetConnectRetryInterval
if sConnToken.Error() != nil {
t.Fatalf("Connect returned error (should be retrying) (%v)", sConnToken.Error())
}
if subToken.Error() != nil {
t.Fatalf("Subscribe returned error (should be persisted) (%v)", sConnToken.Error())
}

// test that the stored subscribe packet gets sent to the broker after connecting
subMemStore2 := NewMemoryStore()
subMemStore2.Open()
subMemStore2.Put(ids[0], packet)

s.Disconnect(250)

// Connect to broker and test that subscription was resumed
sops = NewClientOptions().AddBroker(FVTTCP).SetClientID("resumesubs-sub").
SetStore(subMemStore2).SetResumeSubs(true).SetCleanSession(false).SetConnectRetry(true).
SetConnectRetryInterval(time.Second / 2)

var f MessageHandler = func(client Client, msg Message) {
if msg.Topic() != topic || string(msg.Payload()) != payload {
t.Fatalf("Received unexpected message: %v, %v", msg.Topic(), msg.Payload())
}
choke <- true
}
sops.SetDefaultPublishHandler(f)
s = NewClient(sops).(*client)
if sConnToken = s.Connect(); sConnToken.Wait() && sConnToken.Error() != nil {
t.Fatalf("Error on valid subscribe Connect(): %v", sConnToken.Error())
}

// publish message to subscribed topic to verify subscription
pops := NewClientOptions().AddBroker(FVTTCP).SetClientID("resumesubs-pub").SetCleanSession(true).
SetConnectRetry(true).SetConnectRetryInterval(time.Second / 2)
p := NewClient(pops).(*client)
if pConnToken := p.Connect(); pConnToken.Wait() && pConnToken.Error() != nil {
t.Fatalf("Error on valid Publish.Connect(): %v", pConnToken.Error())
}

if pubToken := p.Publish(topic, 1, false, payload); pubToken.Wait() && pubToken.Error() != nil {
t.Fatalf("Error on valid Client.Publish(): %v", pubToken.Error())
}

wait(choke)

s.Disconnect(250)
p.Disconnect(250)
}
6 changes: 0 additions & 6 deletions net.go
Expand Up @@ -178,12 +178,6 @@ func outgoing(c *client) {
}
DEBUG.Println(NET, "obound wrote msg, id:", msg.MessageID)
case msg := <-c.oboundP:
switch msg.p.(type) {
case *packets.SubscribePacket:
msg.p.(*packets.SubscribePacket).MessageID = c.getID(msg.t)
case *packets.UnsubscribePacket:
msg.p.(*packets.UnsubscribePacket).MessageID = c.getID(msg.t)
}
DEBUG.Println(NET, "obound priority msg to write, type", reflect.TypeOf(msg.p))
if err := msg.p.Write(c.conn); err != nil {
ERROR.Println(NET, "outgoing stopped with error", err)
Expand Down
1 change: 1 addition & 0 deletions token.go
Expand Up @@ -160,6 +160,7 @@ type SubscribeToken struct {
baseToken
subs []string
subResult map[string]byte
messageID uint16
}

// Result returns a map of topics that were subscribed to along with
Expand Down
2 changes: 1 addition & 1 deletion unit_client_test.go
Expand Up @@ -104,4 +104,4 @@ func Test_isConnectionOpenNegative(t *testing.T) {
if c.IsConnectionOpen() {
t.Fail()
}
}
}