Skip to content

Commit

Permalink
Accept cancel at publish on resume
Browse files Browse the repository at this point in the history
  • Loading branch information
kamijin-fanta committed Jun 28, 2019
1 parent adca289 commit 2c1bef9
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions client.go
Expand Up @@ -681,13 +681,19 @@ func (c *client) resume(subscription bool) {
if subscription {
DEBUG.Println(STR, fmt.Sprintf("loaded pending subscribe (%d)", details.MessageID))
token := newToken(packets.Subscribe).(*SubscribeToken)
c.oboundP <- &PacketAndToken{p: packet, t: token}
select {
case c.oboundP <- &PacketAndToken{p: packet, t: token}:
case <-c.stop:
}
}
case *packets.UnsubscribePacket:
if subscription {
DEBUG.Println(STR, fmt.Sprintf("loaded pending unsubscribe (%d)", details.MessageID))
token := newToken(packets.Unsubscribe).(*UnsubscribeToken)
c.oboundP <- &PacketAndToken{p: packet, t: token}
select {
case c.oboundP <- &PacketAndToken{p: packet, t: token}:
case <-c.stop:
}
}
case *packets.PubrelPacket:
DEBUG.Println(STR, fmt.Sprintf("loaded pending pubrel (%d)", details.MessageID))
Expand All @@ -701,7 +707,10 @@ func (c *client) resume(subscription bool) {
c.claimID(token, details.MessageID)
DEBUG.Println(STR, fmt.Sprintf("loaded pending publish (%d)", details.MessageID))
DEBUG.Println(STR, details)
c.obound <- &PacketAndToken{p: packet, t: token}
select {
case c.obound <- &PacketAndToken{p: packet, t: token}:
case <-c.stop:
}
default:
ERROR.Println(STR, "invalid message type in store (discarded)")
c.persist.Del(key)
Expand Down

0 comments on commit 2c1bef9

Please sign in to comment.